Repository: helix Updated Branches: refs/heads/master 5383e7318 -> b6fd8cb83
[HELIX-675] Refactor controller start/cleanup logic to ensure monitor register/reset is handled in any event orders. Due to different possible event process order, controller init event might be processed later or earlier than expected. This cause inconsistency when even handler thread process and record information in the cluster monitor. This change ensures cluster monitor is off when the leadership changes to other node. So no extra metric data will be generated. Also upgrade related test cases to verify MBean counts. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b6fd8cb8 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b6fd8cb8 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b6fd8cb8 Branch: refs/heads/master Commit: b6fd8cb8396e1453afba404b669534883ce796a7 Parents: 5383e73 Author: jiajunwang <ericwang1...@gmail.com> Authored: Thu Mar 8 16:51:37 2018 -0800 Committer: jiajunwang <ericwang1...@gmail.com> Committed: Tue Mar 13 15:55:44 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 262 +++++++++---------- .../stages/CurrentStateComputationStage.java | 11 +- .../stages/ExternalViewComputeStage.java | 55 ++-- .../controller/stages/TaskAssignmentStage.java | 4 +- .../manager/zk/DistributedLeaderElection.java | 5 +- .../monitoring/mbeans/ClusterStatusMonitor.java | 37 +-- .../org/apache/helix/task/TaskRebalancer.java | 14 +- .../TestClusterEventStatusMonitor.java | 1 + .../TestClusterStatusMonitorLifecycle.java | 144 +++++----- .../mbeans/TestClusterStatusMonitor.java | 1 + .../mbeans/TestRebalancerMetrics.java | 8 +- .../mbeans/TestTopStateHandoffMetrics.java | 4 +- 12 files changed, 284 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 2546bd2..933aa3e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -19,18 +19,6 @@ package org.apache.helix.controller; * under the License. */ -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicReference; - import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -38,51 +26,25 @@ import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; -import org.apache.helix.api.listeners.ClusterConfigChangeListener; -import org.apache.helix.api.listeners.ControllerChangeListener; -import org.apache.helix.api.listeners.CurrentStateChangeListener; -import org.apache.helix.api.listeners.IdealStateChangeListener; -import org.apache.helix.api.listeners.InstanceConfigChangeListener; -import org.apache.helix.api.listeners.LiveInstanceChangeListener; -import org.apache.helix.api.listeners.MessageListener; -import org.apache.helix.api.listeners.PreFetch; -import org.apache.helix.api.listeners.ResourceConfigChangeListener; +import org.apache.helix.api.listeners.*; +import org.apache.helix.common.ClusterEventBlockingQueue; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; -import org.apache.helix.controller.stages.AttributeName; -import org.apache.helix.controller.stages.BestPossibleStateCalcStage; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.common.ClusterEventBlockingQueue; -import org.apache.helix.controller.stages.ClusterEventType; -import org.apache.helix.controller.stages.CompatibilityCheckStage; -import org.apache.helix.controller.stages.CurrentStateComputationStage; -import org.apache.helix.controller.stages.ExternalViewComputeStage; -import org.apache.helix.controller.stages.IntermediateStateCalcStage; -import org.apache.helix.controller.stages.MessageGenerationPhase; -import org.apache.helix.controller.stages.MessageSelectionStage; -import org.apache.helix.controller.stages.MessageThrottleStage; -import org.apache.helix.controller.stages.PersistAssignmentStage; -import org.apache.helix.controller.stages.ReadClusterDataStage; -import org.apache.helix.controller.stages.ResourceComputationStage; -import org.apache.helix.controller.stages.ResourceValidationStage; -import org.apache.helix.controller.stages.TargetExteralViewCalcStage; -import org.apache.helix.controller.stages.TaskAssignmentStage; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.MaintenanceSignal; -import org.apache.helix.model.Message; -import org.apache.helix.model.PauseSignal; -import org.apache.helix.model.ResourceConfig; +import org.apache.helix.controller.stages.*; +import org.apache.helix.model.*; import org.apache.helix.monitoring.mbeans.ClusterEventMonitor; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.TaskDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.helix.HelixConstants.ChangeType; /** @@ -113,7 +75,10 @@ public class GenericHelixController implements IdealStateChangeListener, final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances; final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions; - ClusterStatusMonitor _clusterStatusMonitor; + // By default not reporting status until controller status is changed to activate + // TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering. + private boolean _isMonitoring = false; + private final ClusterStatusMonitor _clusterStatusMonitor; /** * A queue for controller events and a thread that will consume it @@ -192,10 +157,10 @@ public class GenericHelixController implements IdealStateChangeListener, NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); - ClusterEvent event = new ClusterEvent(_clusterName,ClusterEventType.PeriodicalRebalance); + ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.PeriodicalRebalance); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); - List<ZNRecord> dummy = new ArrayList<ZNRecord>(); + List<ZNRecord> dummy = new ArrayList<>(); event.addAttribute(AttributeName.eventData.name(), dummy); // Should be able to process _eventQueue.put(event); @@ -311,6 +276,8 @@ public class GenericHelixController implements IdealStateChangeListener, initPipelines(_eventThread, _cache, false); initPipelines(_taskEventThread, _taskCache, true); + + _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName); } /** @@ -331,6 +298,9 @@ public class GenericHelixController implements IdealStateChangeListener, return; } + // TODO If init controller with paused = true, it may not take effect immediately + // _paused is default false. If any events come before controllerChangeEvent, the controller + // will be excuting in un-paused mode. Which might not be the config in ZK. if (_paused) { logger.info("Cluster " + manager.getClusterName() + " is paused. Ignoring the event:" + event .getEventType()); @@ -342,38 +312,28 @@ public class GenericHelixController implements IdealStateChangeListener, context = event.getAttribute(AttributeName.changeContext.name()); } - // Initialize _clusterStatusMonitor if (context != null) { if (context.getType() == Type.FINALIZE) { stopRebalancingTimer(); logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType()); return; } else { - synchronized (this) { - if (_clusterStatusMonitor == null) { - _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName()); - } - } - // TODO: should be in the initization of controller. + // TODO: should be in the initialization of controller. if (_cache != null) { checkRebalancingTimer(manager, Collections.EMPTY_LIST, _cache.getClusterConfig()); } - - if (cache.isTaskCache()) { - TaskDriver driver = new TaskDriver(manager); - _clusterStatusMonitor.refreshWorkflowsStatus(driver); - _clusterStatusMonitor.refreshJobsStatus(driver); + if (_isMonitoring) { + event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor); } - event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor); } } // add the cache event.addAttribute(AttributeName.ClusterDataCache.name(), cache); - List<Pipeline> pipelines = cache.isTaskCache() - ? _taskRegistry.getPipelinesForEvent(event.getEventType()) - : _registry.getPipelinesForEvent(event.getEventType()); + List<Pipeline> pipelines = cache.isTaskCache() ? + _taskRegistry.getPipelinesForEvent(event.getEventType()) : + _registry.getPipelinesForEvent(event.getEventType()); if (pipelines == null || pipelines.size() == 0) { logger.info( @@ -404,40 +364,51 @@ public class GenericHelixController implements IdealStateChangeListener, if (!cache.isTaskCache()) { // report event process durations - if (_clusterStatusMonitor != null) { - NotificationContext notificationContext = - event.getAttribute(AttributeName.changeContext.name()); - long enqueueTime = event.getCreationTime(); - long zkCallbackTime; - StringBuilder sb = new StringBuilder(); - if (notificationContext != null) { - zkCallbackTime = notificationContext.getCreationTime(); + NotificationContext notificationContext = + event.getAttribute(AttributeName.changeContext.name()); + long enqueueTime = event.getCreationTime(); + long zkCallbackTime; + StringBuilder sb = new StringBuilder(); + if (notificationContext != null) { + zkCallbackTime = notificationContext.getCreationTime(); + if (_isMonitoring) { _clusterStatusMonitor .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(), enqueueTime - zkCallbackTime); - sb.append(String.format( - "Callback time for event: " + event.getEventType() + " took: " + (enqueueTime - - zkCallbackTime) + " ms\n")); - } + sb.append(String.format( + "Callback time for event: " + event.getEventType() + " took: " + (enqueueTime + - zkCallbackTime) + " ms\n")); + } + if (_isMonitoring) { _clusterStatusMonitor .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(), startTime - enqueueTime); _clusterStatusMonitor .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(), endTime - startTime); - sb.append(String.format( - "InQueue time for event: " + event.getEventType() + " took: " + (startTime - - enqueueTime) + " ms\n")); - sb.append(String.format( - "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime - - startTime) + " ms")); - logger.info(sb.toString()); } - } + sb.append(String.format( + "InQueue time for event: " + event.getEventType() + " took: " + (startTime - enqueueTime) + + " ms\n")); + sb.append(String.format( + "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime + - startTime) + " ms")); + logger.info(sb.toString()); + } else if (_isMonitoring) { + // report workflow status + TaskDriver driver = new TaskDriver(manager); + _clusterStatusMonitor.refreshWorkflowsStatus(driver); + _clusterStatusMonitor.refreshJobsStatus(driver); + } + + // If event handling happens before controller deactivate, the process may write unnecessary + // MBeans to monitoring after the monitor is disabled. + // So reset ClusterStatusMonitor according to it's status after all event handling. + // TODO remove this once clusterStatusMonitor blocks any MBean register on isMonitoring = false. + resetClusterStatusMonitor(); } - @Override @PreFetch(enabled = false) public void onStateChange(String instanceName, List<CurrentState> statesInfo, @@ -457,7 +428,8 @@ public class GenericHelixController implements IdealStateChangeListener, notifyCaches(changeContext, ChangeType.MESSAGE); pushToEventQueues(ClusterEventType.MessageChange, changeContext, Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName)); - if (_clusterStatusMonitor != null && messages != null) { + + if (_isMonitoring && messages != null) { _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size()); } @@ -608,42 +580,36 @@ public class GenericHelixController implements IdealStateChangeListener, @Override public void onControllerChange(NotificationContext changeContext) { logger.info("START: GenericClusterController.onControllerChange() for cluster " + _clusterName); + _cache.requireFullRefresh(); _taskCache.requireFullRefresh(); - if (changeContext != null && changeContext.getType() == Type.FINALIZE) { - logger.info("GenericClusterController.onControllerChange() FINALIZE for cluster " + _clusterName); - return; - } - HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); - // double check if this controller is the leader - Builder keyBuilder = accessor.keyBuilder(); - LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader()); - if (leader == null) { - logger - .warn("No controller exists for cluster:" + changeContext.getManager().getClusterName()); - return; - } else { - String leaderName = leader.getInstanceName(); + boolean controllerIsLeader; - String instanceName = changeContext.getManager().getInstanceName(); - if (leaderName == null || !leaderName.equals(instanceName)) { - logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: " + leader); - return; - } + if (changeContext != null && changeContext.getType() == Type.FINALIZE) { + logger.info( + "GenericClusterController.onControllerChange() FINALIZE for cluster " + _clusterName); + controllerIsLeader = false; + } else { + // double check if this controller is the leader + controllerIsLeader = changeContext.getManager().isLeader(); + } + + HelixManager manager = changeContext.getManager(); + if (controllerIsLeader) { + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause()); + MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance()); + _paused = updateControllerState(changeContext, pauseSignal, _paused); + _inMaintenanceMode = + updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode); + enableClusterStatusMonitor(true); + _clusterStatusMonitor.setEnabled(!_paused); + } else { + enableClusterStatusMonitor(false); } - PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause()); - MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance()); - _paused = updateControllerState(changeContext, pauseSignal, _paused); - _inMaintenanceMode = updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode); - - synchronized (this) { - if (_clusterStatusMonitor == null) { - _clusterStatusMonitor = new ClusterStatusMonitor(changeContext.getManager().getClusterName()); - } - } - _clusterStatusMonitor.setEnabled(!_paused); logger.info("END: GenericClusterController.onControllerChange() for cluster " + _clusterName); } @@ -656,7 +622,7 @@ public class GenericHelixController implements IdealStateChangeListener, NotificationContext changeContext) { // construct maps for current live-instances - Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>(); + Map<String, LiveInstance> curInstances = new HashMap<>(); Map<String, LiveInstance> curSessions = new HashMap<>(); for (LiveInstance liveInstance : liveInstances) { curInstances.put(liveInstance.getInstanceName(), liveInstance); @@ -723,21 +689,51 @@ public class GenericHelixController implements IdealStateChangeListener, } } - public void shutdownClusterStatusMonitor(String clusterName) { - if (_clusterStatusMonitor != null) { - logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName); - _clusterStatusMonitor.reset(); - _clusterStatusMonitor = null; - } - } - public void shutdown() throws InterruptedException { stopRebalancingTimer(); terminateEventThread(_eventThread); terminateEventThread(_taskEventThread); - _asyncTasksThreadPool.shutdown(); + // shutdown asycTasksThreadpool and wait for terminate. + _asyncTasksThreadPool.shutdownNow(); + try { + _asyncTasksThreadPool.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + logger.warn("Timeout when terminating async tasks. Some async tasks are still executing."); + } + + enableClusterStatusMonitor(false); + + // TODO controller shouldn't be used in anyway after shutdown. + // Need to record shutdown and throw Exception if the controller is used again. + } + + private void enableClusterStatusMonitor(boolean enable) { + synchronized (_clusterStatusMonitor) { + if (_isMonitoring != enable) { + // monitoring state changed + if (enable) { + logger.info("Enable clusterStatusMonitor for cluster " + _clusterName); + _clusterStatusMonitor.active(); + } else { + logger.info("Disable clusterStatusMonitor for cluster " + _clusterName); + // Reset will be done if (_isMonitoring = false) later, no matter if the state is changed or not. + } + _isMonitoring = enable; + } + // Due to multithreads processing, async thread may write to monitor even it is closed. + // So when it is disabled, always try to clear the monitor. + resetClusterStatusMonitor(); + } + } + + private void resetClusterStatusMonitor() { + synchronized (_clusterStatusMonitor) { + if (!_isMonitoring) { + _clusterStatusMonitor.reset(); + } + } } private void terminateEventThread(Thread thread) throws InterruptedException { @@ -784,12 +780,12 @@ public class GenericHelixController implements IdealStateChangeListener, _eventBlockingQueue = eventBlockingQueue; } - @Override public void run() { + @Override + public void run() { logger.info("START ClusterEventProcessor thread for cluster " + _clusterName); while (!isInterrupted()) { try { - ClusterEvent event = _eventBlockingQueue.take(); - handleEvent(event, _cache); + handleEvent(_eventBlockingQueue.take(), _cache); } catch (InterruptedException e) { logger.warn("ClusterEventProcessor interrupted", e); interrupt(); http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 3c2c977..f8f5a2f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -219,8 +219,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage { long startTime = missingTopStateMap.get(resourceName).get(partitionName); if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold) { missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED); - clusterStatusMonitor - .updateMissingTopStateDurationStats(resourceName, 0L, false); + if (clusterStatusMonitor != null) { + clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false); + } } } } @@ -294,8 +295,10 @@ public class CurrentStateComputationStage extends AbstractBaseStage { if (handOffStartTime != TRANSITION_FAILED && handOffEndTime - handOffStartTime <= threshold) { LOG.info(String.format("Missing topstate duration is %d for partition %s", handOffEndTime - handOffStartTime, partition.getPartitionName())); - clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(), - handOffEndTime - handOffStartTime, true); + if (clusterStatusMonitor != null) { + clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(), + handOffEndTime - handOffStartTime, true); + } } removeFromStatsMap(missingTopStateMap, resource, partition); } http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 8d1a48c..9e91ba5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -19,39 +19,20 @@ package org.apache.helix.controller.stages; * under the License. */ -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixDefinedState; -import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; +import org.apache.helix.*; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.ZNRecord; -import org.apache.helix.ZNRecordDelta; import org.apache.helix.ZNRecordDelta.MergeOperation; -import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; +import org.apache.helix.model.*; import org.apache.helix.model.Message.MessageType; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceConfig; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.model.StatusUpdate; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; + public class ExternalViewComputeStage extends AbstractBaseStage { private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class); @@ -106,22 +87,24 @@ public class ExternalViewComputeStage extends AbstractBaseStage { // Update cluster status monitor mbean IdealState idealState = cache.getIdealState(resourceName); if (!cache.isTaskCache()) { + ResourceConfig resourceConfig = cache.getResourceConfig(resourceName); ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); - ResourceConfig resourceConfig = cache.getResourceConfig(resourceName); - if (idealState != null && (resourceConfig == null || !resourceConfig - .isMonitoringDisabled())) { - if (clusterStatusMonitor != null && !idealState.getStateModelDefRef() - .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { - StateModelDefinition stateModelDef = - cache.getStateModelDef(idealState.getStateModelDefRef()); - clusterStatusMonitor - .setResourceStatus(view, cache.getIdealState(view.getResourceName()), - stateModelDef); + if (clusterStatusMonitor != null) { + if (idealState != null && (resourceConfig == null || !resourceConfig + .isMonitoringDisabled())) { + if (!idealState.getStateModelDefRef() + .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { + StateModelDefinition stateModelDef = + cache.getStateModelDef(idealState.getStateModelDefRef()); + clusterStatusMonitor + .setResourceStatus(view, cache.getIdealState(view.getResourceName()), + stateModelDef); + } + } else { + // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true. + clusterStatusMonitor.unregisterResource(view.getResourceName()); } - } else { - // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true. - clusterStatusMonitor.unregisterResource(view.getResourceName()); } } ExternalView curExtView = curExtViews.get(resourceName); http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index ed6ede2..d8ccd0f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -77,7 +77,9 @@ public class TaskAssignmentStage extends AbstractBaseStage { if (!cache.isTaskCache()) { ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); - clusterStatusMonitor.increaseMessageReceived(outputMessages); + if (clusterStatusMonitor != null) { + clusterStatusMonitor.increaseMessageReceived(outputMessages); + } } long cacheStart = System.currentTimeMillis(); cache.cacheMessages(outputMessages); http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java index edc467b..6022edd 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java @@ -22,7 +22,6 @@ package org.apache.helix.manager.zk; import java.lang.management.ManagementFactory; import java.util.List; -import org.apache.helix.ControllerChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixTimerTask; @@ -30,6 +29,7 @@ import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyType; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.api.listeners.ControllerChangeListener; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.model.LeaderHistory; import org.apache.helix.model.LiveInstance; @@ -97,11 +97,10 @@ public class DistributedLeaderElection implements ControllerChangeListener { } } } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) { - LOG.info(_manager.getInstanceName() + " reqlinquish leadership for cluster: " + LOG.info(_manager.getInstanceName() + " relinquish leadership for cluster: " + _manager.getClusterName()); controllerHelper.stopControllerTimerTasks(); controllerHelper.removeListenersFromController(_controller); - _controller.shutdownClusterStatusMonitor(_manager.getClusterName()); /** * clear write-through cache http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 2a99341..fe682ac 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -85,11 +85,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { public ClusterStatusMonitor(String clusterName) { _clusterName = clusterName; _beanServer = ManagementFactory.getPlatformMBeanServer(); - try { - register(this, getObjectName(clusterBeanName())); - } catch (Exception e) { - LOG.error("Fail to regiter ClusterStatusMonitor", e); - } } public ObjectName getObjectName(String name) throws MalformedObjectNameException { @@ -275,20 +270,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) { - if (!_clusterEventMbeanMap.containsKey(phase)) { - ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase); - try { - ClusterEventMonitor prevEventMbean = _clusterEventMbeanMap.put(phase, monitor); - if (prevEventMbean != null) { - prevEventMbean.unregister(); + try { + if (!_clusterEventMbeanMap.containsKey(phase)) { + synchronized (this) { + if (!_clusterEventMbeanMap.containsKey(phase)) { + ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase); + monitor.register(); + _clusterEventMbeanMap.put(phase, monitor); + } } - monitor.register(); - } catch (JMException e) { - LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + _clusterName - + " and phase type: " + phase, e); - return null; } + } catch (JMException e) { + LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + _clusterName + + " and phase type: " + phase, e); } + return _clusterEventMbeanMap.get(phase); } @@ -477,6 +473,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { _instanceMsgQueueSizes.put(instanceName, msgQueueSize); } + public void active() { + LOG.info("Active ClusterStatusMonitor"); + try { + register(this, getObjectName(clusterBeanName())); + } catch (Exception e) { + LOG.error("Fail to register ClusterStatusMonitor", e); + } + } + public void reset() { LOG.info("Reset ClusterStatusMonitor"); try { http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 3d3f86e..19a4049 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -78,13 +78,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { failedJobs ++; if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) { ctx.setWorkflowState(TaskState.FAILED); - _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED); + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED); + } for (String jobToFail : cfg.getJobDag().getAllNodes()) { if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) { ctx.setJobState(jobToFail, TaskState.ABORTED); // Skip aborted jobs latency since they are not accurate latency for job running time - _clusterStatusMonitor - .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED); + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED); + } } } return true; @@ -98,8 +101,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (!incomplete && cfg.isTerminable()) { ctx.setWorkflowState(TaskState.COMPLETED); - _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, - ctx.getFinishTime() - ctx.getStartTime()); + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, ctx.getFinishTime() - ctx.getStartTime()); + } return true; } http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java index b607add..69b553c 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java @@ -47,6 +47,7 @@ public class TestClusterEventStatusMonitor { private class ClusterStatusMonitorForTest extends ClusterStatusMonitor { public ClusterStatusMonitorForTest(String clusterName) { super(clusterName); + active(); } public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() { return _clusterEventMbeanMap; http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java index 3db8237..ccaba4e 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java @@ -19,14 +19,6 @@ package org.apache.helix.monitoring; * under the License. */ -import java.io.IOException; -import java.util.Date; - -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.MBeanServerNotification; -import javax.management.MalformedObjectNameException; - import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.integration.common.ZkIntegrationTestBase; @@ -34,7 +26,6 @@ import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; -import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; @@ -45,8 +36,16 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import javax.management.*; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class); MockParticipantManager[] _participants; ClusterDistributedController[] _controllers; @@ -60,11 +59,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { @BeforeClass public void beforeClass() throws Exception { String className = TestHelper.getTestClassName(); - String methodName = TestHelper.getTestMethodName(); - _clusterNamePrefix = className + "_" + methodName; + _clusterNamePrefix = className; - System.out.println("START " + _clusterNamePrefix + " at " - + new Date(System.currentTimeMillis())); + System.out + .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); // setup 10 clusters for (int i = 0; i < clusterNb; i++) { @@ -83,8 +81,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { // setup controller cluster _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix; - TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller - // port + TestHelper.setupCluster(_controllerClusterName, ZK_ADDR, // controller + 0, // port "controller", // participant name prefix _clusterNamePrefix, // resource name prefix 1, // resources @@ -101,10 +99,9 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { _controllers[i].syncStart(); } - boolean result = - ClusterStateVerifier.verifyByZkCallback( - new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName), - 30000); + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName), + 30000); Assert.assertTrue(result, "Controller cluster NOT in ideal state"); // start first cluster @@ -116,9 +113,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { _participants[i].syncStart(); } - result = - ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, - _firstClusterName)); + result = ClusterStateVerifier + .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _firstClusterName)); Assert.assertTrue(result, "first cluster NOT in ideal state"); // add more controllers to controller cluster @@ -135,32 +131,23 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { } // verify controller cluster - result = - ClusterStateVerifier - .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, - _controllerClusterName)); + result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName)); Assert.assertTrue(result, "Controller cluster NOT in ideal state"); // verify first cluster - result = - ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, - _firstClusterName)); + result = ClusterStateVerifier + .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _firstClusterName)); Assert.assertTrue(result, "first cluster NOT in ideal state"); } @AfterClass public void afterClass() { System.out.println("Cleaning up..."); - for (int i = 0; i < 5; i++) { - _controllers[i].syncStop(); - } - - for (int i = 0; i < 5; i++) { + for (int i = 0; i < _participants.length; i++) { _participants[i].syncStop(); } - System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); - } class ParticipantMonitorListener extends ClusterMBeanObserver { @@ -168,8 +155,9 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { int _nMbeansUnregistered = 0; int _nMbeansRegistered = 0; - public ParticipantMonitorListener(String domain) throws InstanceNotFoundException, IOException, - MalformedObjectNameException, NullPointerException { + public ParticipantMonitorListener(String domain) + throws InstanceNotFoundException, IOException, MalformedObjectNameException, + NullPointerException { super(domain); } @@ -188,14 +176,21 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { } } - @Test (enabled = false) - public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException, - MalformedObjectNameException, NullPointerException, IOException, InterruptedException { - ParticipantMonitorListener listener = - new ParticipantMonitorListener(MonitorDomainNames.ClusterStatus.name()); + private void cleanupControllers() { + for (int i = 0; i < _controllers.length; i++) { + _controllers[i].syncStop(); + } + } - int nMbeansUnregistered = listener._nMbeansUnregistered; - int nMbeansRegistered = listener._nMbeansRegistered; + @Test + public void testClusterStatusMonitorLifecycle() + throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, + IOException, InterruptedException { + // Filter other unrelated clusters' metrics + QueryExp exp = + Query.match(Query.attr("SensorName"), Query.value("*" + _clusterNamePrefix + "*")); + Set<ObjectInstance> mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); _participants[0].disconnect(); @@ -203,8 +198,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { // No change in instance/resource mbean // Unregister 1 per-instance resource mbean and message queue mbean Thread.sleep(1000); - Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 2); - Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered); + int previousMBeanCount = mbeans.size(); + mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + Assert.assertEquals(mbeans.size(), previousMBeanCount - 2); HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor(); String firstControllerName = @@ -217,13 +214,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { } } firstController.disconnect(); - Thread.sleep(2000); - // 1 cluster status monitor, 1 resource monitor, 5 instances - // Unregister 1+4+1 per-instance resource mbean - // Register 4 per-instance resource mbean - Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33); - Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 29); + // 1 controller goes away + // 1 message queue mbean, 1 PerInstanceResource mbean, and one message queue mbean + Thread.sleep(2000); + previousMBeanCount = mbeans.size(); + mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + Assert.assertEquals(mbeans.size(), previousMBeanCount - 3); String instanceName = "localhost0_" + (12918 + 0); _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName); @@ -231,10 +229,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { // 1 participant comes back // No change in instance/resource mbean - // Register 1 per-instance resource mbean + // Register 1 per-instance resource mbean and 1 message queue mbean Thread.sleep(2000); - Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33); - Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 31); + previousMBeanCount = mbeans.size(); + mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + Assert.assertEquals(mbeans.size(), previousMBeanCount + 2); // Add a resource // Register 1 resource mbean @@ -248,15 +248,37 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase { Integer.parseInt(idealState.getReplicas())); Thread.sleep(2000); - Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33); - Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37); + // Add one resource, PerInstanceResource mbeans and 1 resource monitor + previousMBeanCount = mbeans.size(); + mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + Assert.assertEquals(mbeans.size(), previousMBeanCount + _participants.length + 1); // Remove a resource // No change in instance/resource mbean // Unregister 5 per-instance resource mbean setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1"); Thread.sleep(2000); - Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 39); - Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37); + previousMBeanCount = mbeans.size(); + mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + Assert.assertEquals(mbeans.size(), previousMBeanCount - (_participants.length + 1)); + + // Cleanup controllers then MBeans should all be removed. + cleanupControllers(); + Thread.sleep(2000); + + // Check if any MBeans leftover. + // Note that MessageQueueStatus is not bound with controller only. So it will still exist. + exp = Query + .and(Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))), + exp); + if (!ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty()) { + System.out.println(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + } + Assert.assertTrue(ManagementFactory.getPlatformMBeanServer() + .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty()); } } http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java index 723d969..755997b 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java @@ -54,6 +54,7 @@ public class TestClusterStatusMonitor { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName); + monitor.active(); ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName()); try { _server.getMBeanInfo(clusterMonitorObjName); http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java index df88397..61acdb5 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java @@ -66,7 +66,9 @@ public class TestRebalancerMetrics extends BaseStageTest { event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName)); + ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName); + monitor.active(); + event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor); runStage(event, new ReadClusterDataStage()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); @@ -110,7 +112,9 @@ public class TestRebalancerMetrics extends BaseStageTest { event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName)); + ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName); + monitor.active(); + event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor); runStage(event, new ReadClusterDataStage()); runStage(event, new BestPossibleStateCalcStage()); http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java index 51b048d..7b97ac3 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java @@ -54,7 +54,9 @@ public class TestTopStateHandoffMetrics extends BaseStageTest { resource.setStateModelDefRef("MasterSlave"); resource.addPartition(PARTITION); event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource)); - event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor("TestCluster")); + ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster"); + monitor.active(); + event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor); } @Test(dataProvider = "successCurrentStateInput")