Support event differentiated logging

Existing task pipeline log are combined with regular pipeline logs which make 
debugging hard. To have a event tied logs can benefit for future debugging 
purposes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/092b3f10
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/092b3f10
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/092b3f10

Branch: refs/heads/master
Commit: 092b3f1097bf7d3a97d2308db0e9da510d1bcb4c
Parents: 68fe74e
Author: Junkai Xue <j...@linkedin.com>
Authored: Thu Jul 12 17:01:27 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Thu Jul 12 17:01:27 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/AbstractDataCache.java  | 10 +++
 .../helix/common/caches/CurrentStateCache.java  | 16 +++--
 .../helix/common/caches/IdealStateCache.java    |  3 +-
 .../controller/GenericHelixController.java      | 36 ++++++----
 .../org/apache/helix/controller/LogUtil.java    | 37 ++++++++++
 .../controller/pipeline/AbstractBaseStage.java  |  3 +
 .../helix/controller/pipeline/Pipeline.java     |  6 +-
 .../stages/BestPossibleStateCalcStage.java      | 36 +++++-----
 .../helix/controller/stages/ClusterEvent.java   | 23 ++++++-
 .../stages/CompatibilityCheckStage.java         |  3 +-
 .../stages/CurrentStateComputationStage.java    |  7 +-
 .../stages/ExternalViewComputeStage.java        | 21 +++---
 .../stages/IntermediateStateCalcStage.java      | 71 +++++++++++---------
 .../stages/MessageGenerationPhase.java          | 44 ++++++------
 .../stages/MessageSelectionStage.java           |  6 +-
 .../controller/stages/MessageThrottleStage.java | 15 +++--
 .../stages/PersistAssignmentStage.java          |  6 +-
 .../controller/stages/ReadClusterDataStage.java |  6 +-
 .../stages/ResourceComputationStage.java        |  9 ++-
 .../stages/ResourceValidationStage.java         | 10 ++-
 .../controller/stages/TaskAssignmentStage.java  | 16 +++--
 21 files changed, 259 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index 56b4aa4..1c5924e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
@@ -31,7 +32,15 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AbstractDataCache {
   private static Logger LOG = 
LoggerFactory.getLogger(AbstractDataCache.class.getName());
+  private String _eventId = "NO_ID";
 
+  public String getEventId() {
+    return _eventId;
+  }
+
+  public void setEventId(String eventId) {
+    _eventId = eventId;
+  }
 
   /**
    * Selectively fetch Helix Properties from ZK by comparing the version of 
local cached one with the one on ZK.
@@ -54,6 +63,7 @@ public abstract class AbstractDataCache {
       HelixProperty.Stat stat = stats.get(i);
       if (stat != null) {
         T property = cachedPropertyMap.get(key);
+
         if (property != null && property.getBucketSize() == 0 && 
property.getStat().equals(stat)) {
           refreshedPropertyMap.put(key, property);
         } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 4c6ecb8..9b1fb99 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
@@ -93,10 +94,12 @@ public class CurrentStateCache extends AbstractDataCache {
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END: CurrentStateCache.refresh() for cluster " + _clusterName + 
", took " + (endTime
-        - startTime) + " ms");
+    LogUtil.logInfo(LOG, getEventId(),
+        "END: CurrentStateCache.refresh() for cluster " + _clusterName + ", 
took " + (endTime
+            - startTime) + " ms");
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Current State freshed : ", 
_currentStateMap.toString()));
+      LogUtil.logDebug(LOG, getEventId(),
+          String.format("Current State freshed : ", 
_currentStateMap.toString()));
     }
     return true;
   }
@@ -129,9 +132,10 @@ public class CurrentStateCache extends AbstractDataCache {
             _currentStateCache));
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("# of CurrentStates reload: " + reloadKeys.size() + ", 
skipped:" + (
-          currentStateKeys.size() - reloadKeys.size()) + ". took " + 
(System.currentTimeMillis()
-          - start) + " ms to reload new current states for cluster: " + 
_clusterName);
+      LogUtil.logDebug(LOG, getEventId(),
+          "# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + (
+              currentStateKeys.size() - reloadKeys.size()) + ". took " + 
(System.currentTimeMillis()
+              - start) + " ms to reload new current states for cluster: " + 
_clusterName);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
index 12056e4..99de0bc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.model.IdealState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class IdealStateCache extends AbstractDataCache {
     _idealStateMap = new HashMap<>(newIdealStateMap);
 
     long endTime = System.currentTimeMillis();
-    LOG.info(
+    LogUtil.logInfo(LOG, getEventId(),
         "Refresh " + _idealStateMap.size() + " idealStates for cluster " + 
_clusterName + ", took "
             + (endTime - startTime) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 474d621..8d1e44b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -406,21 +407,23 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
 
     // add the cache
+    _cache.setEventId(event.getEventId());
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
 
     List<Pipeline> pipelines = cache.isTaskCache() ?
-        _taskRegistry.getPipelinesForEvent(event.getEventType()) :
-        _registry.getPipelinesForEvent(event.getEventType());
+        _taskRegistry.getPipelinesForEvent(event.getEventType()) : _registry
+        .getPipelinesForEvent(event.getEventType());
 
     if (pipelines == null || pipelines.size() == 0) {
-      logger.info(
-          "No " + getPipelineType(cache.isTaskCache()) + " pipeline to run for 
event:" + event
-              .getEventType());
+      logger.info(String
+          .format("No % pipeline to run for event: %s %s", 
getPipelineType(cache.isTaskCache()),
+              event.getEventType(), event.getEventId()));
       return;
     }
 
-    logger.info(String.format("START: Invoking %s controller pipeline for 
cluster %s event: %s",
-        manager.getClusterName(), getPipelineType(cache.isTaskCache()), 
event.getEventType()));
+    logger.info(String.format("START: Invoking %s controller pipeline for 
cluster %s event: %s  %s",
+        manager.getClusterName(), getPipelineType(cache.isTaskCache()), 
event.getEventType(),
+        event.getEventId()));
     long startTime = System.currentTimeMillis();
     boolean rebalanceFail = false;
     for (Pipeline pipeline : pipelines) {
@@ -463,9 +466,9 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
     long endTime = System.currentTimeMillis();
     logger.info(String
-        .format("END: Invoking %s controller pipeline for event: %s for 
cluster %s, took %d ms",
-            getPipelineType(cache.isTaskCache()), event.getEventType(), 
manager.getClusterName(),
-            (endTime - startTime)));
+        .format("END: Invoking %s controller pipeline for event: %s %s for 
cluster %s, took %d ms",
+            getPipelineType(cache.isTaskCache()), event.getEventType(), 
event.getEventId(),
+            manager.getClusterName(), (endTime - startTime)));
 
     if (!cache.isTaskCache()) {
       // report event process durations
@@ -685,7 +688,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
   private void pushToEventQueues(ClusterEventType eventType, 
NotificationContext changeContext,
       Map<String, Object> eventAttributes) {
-    ClusterEvent event = new ClusterEvent(_clusterName, eventType);
+    // No need for completed UUID, prefixed should be fine
+    String uid = UUID.randomUUID().toString().substring(0, 8);
+    ClusterEvent event = new ClusterEvent(_clusterName, eventType,
+        String.format("%s_%s", uid, PipelineTypes.DEFAULT.name()));
     event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), 
_asyncFIFOWorkerPool);
@@ -693,7 +699,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       event.addAttribute(attr.getKey(), attr.getValue());
     }
     _eventQueue.put(event);
-    _taskEventQueue.put(event.clone());
+    _taskEventQueue.put(event.clone(String.format("%s_%s", uid, 
PipelineTypes.TASK.name())));
   }
 
   @Override
@@ -887,13 +893,15 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       if (statusFlag) {
         statusFlag = false;
         logger.info("controller is now resumed from paused state");
-        ClusterEvent event = new ClusterEvent(_clusterName, 
ClusterEventType.Resume);
+        String uid = UUID.randomUUID().toString().substring(0, 8);
+        ClusterEvent event = new ClusterEvent(_clusterName, 
ClusterEventType.Resume,
+            String.format("%s_%s", uid, PipelineTypes.DEFAULT.name()));
         event.addAttribute(AttributeName.changeContext.name(), changeContext);
         event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
         event.addAttribute(AttributeName.eventData.name(), signal);
         event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), 
_asyncFIFOWorkerPool);
         _eventQueue.put(event);
-        _taskEventQueue.put(event.clone());
+        _taskEventQueue.put(event.clone(String.format("%s_%s", uid, 
PipelineTypes.TASK.name())));
       }
     }
     return statusFlag;

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java 
b/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java
new file mode 100644
index 0000000..4d3ae05
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/LogUtil.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller;
+
+import org.slf4j.Logger;
+
+public class LogUtil {
+  public static void logInfo(Logger logger, String eventInfo, String message) {
+    logger.info(String.format("Event %s : %s", eventInfo, message));
+  }
+
+  public static void logWarn(Logger logger, String eventInfo, String message) {
+    logger.warn(String.format("Event %s : %s", eventInfo, message));
+  }
+
+  public static void logError(Logger logger, String eventInfo, String message) 
{
+    logger.error(String.format("Event %s  %s", eventInfo, message));
+  }
+
+  public static void logDebug(Logger logger, String eventInfo, String message) 
{
+    logger.debug(String.format("Event %s  %s", eventInfo, message));
+  }
+
+  public static void logInfo(Logger logger, String eventInfo, String message, 
Exception e) {
+    logger.info(String.format("Event %s : %s", eventInfo, message), e);
+  }
+
+  public static void logWarn(Logger logger, String eventInfo, String message, 
Exception e) {
+    logger.warn(String.format("Event %s : %s", eventInfo, message), e);
+  }
+
+  public static void logError(Logger logger, String eventInfo, String message, 
Exception e) {
+    logger.error(String.format("Event %s  %s", eventInfo, message), e);
+  }
+
+  public static void logDebug(Logger logger, String eventInfo, String message, 
Exception e) {
+    logger.debug(String.format("Event %s  %s", eventInfo, message), e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
index 324ed02..37259a8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.pipeline;
  */
 
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.common.DedupEventProcessor;
@@ -27,6 +28,8 @@ import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 
 public class AbstractBaseStage implements Stage {
+  protected String _eventId;
+
   @Override
   public void init(StageContext context) {
 

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index 2946129..7f00759 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -65,9 +65,9 @@ public class Pipeline {
 
       long endTime = System.currentTimeMillis();
       long duration = endTime - startTime;
-      logger.info(String
-          .format("END %s for %s pipeline for cluster %s. took: %d ms ", 
stage.getStageName(),
-              _pipelineType, event.getClusterName(), duration));
+      logger.info(String.format("END %s for %s pipeline for cluster %s. took: 
%d ms for event %s",
+          stage.getStageName(), _pipelineType, event.getClusterName(), 
duration,
+          event.getEventId()));
 
       ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index a465f05..a6ba5b8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.AutoRebalancer;
@@ -49,6 +50,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
     final Map<String, Resource> resourceMap =
@@ -78,7 +80,8 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       final Map<String, InstanceConfig> instanceConfigMap = 
cache.getInstanceConfigMap();
       final Map<String, StateModelDefinition> stateModelDefMap = 
cache.getStateModelDefMap();
       asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
-        @Override public Object call() {
+        @Override
+        public Object call() {
           try {
             if (clusterStatusMonitor != null) {
               clusterStatusMonitor
@@ -86,7 +89,8 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
                       resourceMap, stateModelDefMap);
             }
           } catch (Exception e) {
-            logger.error("Could not update cluster status metrics!", e);
+            LogUtil
+                .logError(logger, _eventId, "Could not update cluster status 
metrics!", e);
           }
           return null;
         }
@@ -116,7 +120,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       Resource resource = itr.next().getResource();
       if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
         failureResources.add(resource.getResourceName());
-        logger.warn("Failed to calculate best possible states for " + 
resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId, "Failed to calculate best possible 
states for " + resource.getResourceName());
       }
     }
 
@@ -150,7 +154,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
             clusterStatusMonitor.setRebalanceFailureGauge(hasFailure);
           }
         } catch (Exception e) {
-          logger.error("Could not update cluster status!", e);
+          LogUtil.logError(logger, _eventId, "Could not update cluster 
status!", e);
         }
         return null;
       }
@@ -175,7 +179,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
                 .enableMaintenanceMode(manager.getClusterName(), true, errMsg);
           }
         } else {
-          logger.error("Failed to pause cluster, HelixManager is not set!");
+          LogUtil.logError(logger, _eventId, "Failed to pause cluster, 
HelixManager is not set!");
         }
         if (!cache.isTaskCache()) {
           updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, 
errMsg);
@@ -193,13 +197,13 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     // for each instanceName check if its alive then assign a state
 
     String resourceName = resource.getResourceName();
-    logger.debug("Processing resource:" + resourceName);
+    LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName);
     // Ideal state may be gone. In that case we need to get the state model 
name
     // from the current state
     IdealState idealState = cache.getIdealState(resourceName);
     if (idealState == null) {
       // if ideal state is deleted, use an empty one
-      logger.info("resource:" + resourceName + " does not exist anymore");
+      LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does 
not exist anymore");
       idealState = new IdealState(resourceName);
       idealState.setStateModelDefRef(resource.getStateModelDefRef());
     }
@@ -209,7 +213,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, 
resourceName);
 
     if (rebalancer == null || mappingCalculator == null) {
-      logger.error(
+      LogUtil.logError(logger, _eventId,
           "Error computing assignment for resource " + resourceName + ". no 
rebalancer found. rebalancer: " + rebalancer
               + " mappingCaculator: " + mappingCalculator);
     }
@@ -241,7 +245,8 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
         // Check if calculation is done successfully
         return checkBestPossibleStateCalculation(idealState);
       } catch (Exception e) {
-        logger.error("Error computing assignment for resource " + resourceName 
+ ". Skipping.", e);
+        LogUtil
+            .logError(logger, _eventId, "Error computing assignment for 
resource " + resourceName + ". Skipping.", e);
         // TODO : remove this part after debugging NPE
         StringBuilder sb = new StringBuilder();
 
@@ -254,7 +259,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
             String.format("PartitionAssignment is null : %s\n", 
partitionStateAssignment == null));
         sb.append(String.format("Output is null : %s\n", output == null));
 
-        logger.error(sb.toString());
+        LogUtil.logError(logger, _eventId, sb.toString());
       }
     }
     // Exception or rebalancer is not found
@@ -289,14 +294,15 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName != null) {
       if (logger.isDebugEnabled()) {
-        logger
-            .debug("resource " + resourceName + " use idealStateRebalancer " + 
rebalancerClassName);
+        LogUtil.logDebug(logger, _eventId,
+            "resource " + resourceName + " use idealStateRebalancer " + 
rebalancerClassName);
       }
       try {
         customizedRebalancer = Rebalancer.class
             .cast(HelixUtil.loadClass(getClass(), 
rebalancerClassName).newInstance());
       } catch (Exception e) {
-        logger.error("Exception while invoking custom rebalancer class:" + 
rebalancerClassName, e);
+        LogUtil.logError(logger, _eventId,
+            "Exception while invoking custom rebalancer class:" + 
rebalancerClassName, e);
       }
     }
 
@@ -324,7 +330,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       rebalancer = customizedRebalancer;
       break;
     default:
-      logger.error(
+      LogUtil.logError(logger, _eventId,
           "Fail to find the rebalancer, invalid rebalance mode " + 
idealState.getRebalanceMode());
       break;
     }
@@ -339,7 +345,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       try {
         mappingCalculator = MappingCalculator.class.cast(rebalancer);
       } catch (ClassCastException e) {
-        logger.warn(
+        LogUtil.logWarn(logger, _eventId,
             "Rebalancer does not have a mapping calculator, defaulting to 
SEMI_AUTO, resource: "
                 + resourceName);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index 26f4f17..e82b0e4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
 import java.util.HashMap;
 import java.util.Map;
 
+import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,19 +32,27 @@ public class ClusterEvent {
   private final Map<String, Object> _eventAttributeMap;
   private long _creationTime;
   private String _clusterName;
+  private String _eventId;
 
   @Deprecated
   public ClusterEvent(ClusterEventType eventType) {
     _eventType = eventType;
     _eventAttributeMap = new HashMap<>();
     _creationTime = System.currentTimeMillis();
+    _eventId = UUID.randomUUID().toString();
   }
 
   public ClusterEvent(String clusterName, ClusterEventType eventType) {
+    this(clusterName, eventType, UUID.randomUUID().toString());
+  }
+
+  public ClusterEvent(String clusterName, ClusterEventType eventType, String 
eventId) {
     _clusterName = clusterName;
     _eventType = eventType;
+
     _eventAttributeMap = new HashMap<>();
     _creationTime = System.currentTimeMillis();
+    _eventId = eventId;
   }
 
   public void addAttribute(String attrName, Object attrValue) {
@@ -73,6 +82,15 @@ public class ClusterEvent {
     _clusterName = clusterName;
   }
 
+  public void setEventId(String eventId) {
+    _eventId = eventId;
+  }
+
+  public String getEventId() {
+    return _eventId;
+  }
+
+
   @SuppressWarnings("unchecked")
   public <T extends Object> T getAttribute(String attrName) {
     Object ret = _eventAttributeMap.get(attrName);
@@ -85,6 +103,7 @@ public class ClusterEvent {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
+    sb.append(String.format("Event id : %s", _eventId.toString()));
     sb.append("name:" + _eventType.name()).append("\n");
     for (String key : _eventAttributeMap.keySet()) {
       
sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
@@ -92,8 +111,8 @@ public class ClusterEvent {
     return sb.toString();
   }
 
-  public ClusterEvent clone() {
-    ClusterEvent newEvent = new ClusterEvent(_clusterName, _eventType);
+  public ClusterEvent clone(String eventId) {
+    ClusterEvent newEvent = new ClusterEvent(_clusterName, _eventType, 
eventId);
     newEvent.setCreationTime(_creationTime);
     for (String attributeName : _eventAttributeMap.keySet()) {
       newEvent.addAttribute(attributeName, 
_eventAttributeMap.get(attributeName));

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 28a5a9e..8a0a719 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.LiveInstance;
@@ -56,7 +57,7 @@ public class CompatibilityCheckStage extends 
AbstractBaseStage {
                 + 
properties.getProperty("miminum_supported_version.participant")
                 + ", participant: " + liveInstance.getInstanceName() + ", 
participantVersion: "
                 + participantVersion;
-        LOG.error(errorMsg);
+        LogUtil.logError(LOG, event.getEventId(), errorMsg);
         throw new StageException(errorMsg);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 64cd4f4..a56d194 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -24,6 +24,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     final Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
 
@@ -303,7 +305,8 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
     // 3. if no clue about previous topstate or any related pending message, 
use the current system time.
     if (startTime == NOT_RECORDED) {
-      LOG.warn("Cannot confirm top state missing start time. Use the current 
system time as the start time.");
+      LogUtil.logWarn(LOG, _eventId,
+          "Cannot confirm top state missing start time. Use the current system 
time as the start time.");
       startTime = System.currentTimeMillis();
     }
 
@@ -343,7 +346,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     }
 
     if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= 
threshold) {
-      LOG.info(String.format("Missing topstate duration is %d for partition 
%s",
+      LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration 
is %d for partition %s",
           handOffEndTime - handOffStartTime, partition.getPartitionName()));
       if (clusterStatusMonitor != null) {
         clusterStatusMonitor

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 1f455cb..bf7be01 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
 import org.apache.helix.*;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.StageException;
@@ -44,6 +45,7 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
 
   @Override
   public void execute(final ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
@@ -158,7 +160,8 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
         it.remove();
         // remove the external view if the external view exists
         if (curExtViews.containsKey(resourceName)) {
-          LOG.info("Remove externalView for resource: " + resourceName);
+          LogUtil
+              .logInfo(LOG, _eventId, "Remove externalView for resource: " + 
resourceName);
           dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
           externalviewsToRemove.add(resourceName);
         }
@@ -176,7 +179,7 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
     // remove dead external-views
     for (String resourceName : curExtViews.keySet()) {
       if (!resourceMap.keySet().contains(resourceName)) {
-        LOG.info("Remove externalView for resource: " + resourceName);
+        LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + 
resourceName);
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
         externalviewsToRemove.add(resourceName);
       }
@@ -201,19 +204,19 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
 
     for (String taskPartitionName : ev.getPartitionSet()) {
       for (String taskState : ev.getStateMap(taskPartitionName).values()) {
-        if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
-            || taskState.equalsIgnoreCase("COMPLETED")) {
-          LOG.info(taskPartitionName + " finished as " + taskState);
+        if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) || 
taskState
+            .equalsIgnoreCase("COMPLETED")) {
+          LogUtil.logInfo(LOG, _eventId, taskPartitionName + " finished as " + 
taskState);
           finishedTasks.getListFields().put(taskPartitionName, emptyList);
           finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
 
           // Update original scheduler message status update
           if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) 
!= null) {
-            String controllerMsgId =
-                taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                    
.get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+            String controllerMsgId = 
taskQueueIdealState.getRecord().getMapField(taskPartitionName)
+                .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
             if (controllerMsgId != null) {
-              LOG.info(taskPartitionName + " finished with controllerMsg " + 
controllerMsgId);
+              LogUtil.logInfo(LOG, _eventId,
+                  taskPartitionName + " finished with controllerMsg " + 
controllerMsgId);
               if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, 
String>());
               }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 0f11ecd..3bede5e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -24,6 +24,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionThrottleConfig.RebalanceType;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -44,6 +45,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
 
     BestPossibleStateOutput bestPossibleStateOutput =
@@ -132,8 +134,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       IdealState idealState = dataCache.getIdealState(resourceName);
       if (idealState == null) {
         // If IdealState is null, use an empty one
-        logger.info("IdealState for resource {} does not exist; resource may 
not exist anymore",
-            resourceName);
+        LogUtil.logInfo(logger, _eventId, String
+            .format("IdealState for resource %s does not exist; resource may 
not exist anymore",
+                resourceName));
         idealState = new IdealState(resourceName);
         idealState.setStateModelDefRef(resource.getStateModelDefRef());
       }
@@ -202,7 +205,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
               
manager.getClusterManagmentTool().enableMaintenanceMode(manager.getClusterName(),
                   true, errMsg);
             } else {
-              logger.error(
+              LogUtil.logError(logger, _eventId,
                   "HelixManager is not set/null! Failed to pause this 
cluster/enable maintenance"
                       + " mode due to an instance being assigned more 
replicas/partitions than "
                       + "the limit.");
@@ -233,7 +236,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       Map<String, List<String>> preferenceLists,
       StateTransitionThrottleController throttleController) {
     String resourceName = resource.getResourceName();
-    logger.debug("Processing resource: {}", resourceName);
+    LogUtil.logDebug(logger, _eventId, String.format("Processing resource: 
%s", resourceName));
 
     // Throttling is applied only on FULL-AUTO mode
     if (!throttleController.isThrottleEnabled()
@@ -288,16 +291,19 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     }
 
     if (!partitionsNeedRecovery.isEmpty()) {
-      logger.info("Recovery balance needed for {} partitions: {}", 
resourceName,
-          partitionsNeedRecovery);
+      LogUtil.logInfo(logger, _eventId, String
+          .format("Recovery balance needed for %s partitions: %s", 
resourceName,
+              partitionsNeedRecovery));
     }
     if (!partitionsNeedLoadBalance.isEmpty()) {
-      logger.info("Load balance needed for partitions: {}", resourceName,
-          partitionsNeedLoadBalance);
+      LogUtil.logInfo(logger, _eventId, String
+          .format("Load balance needed for %s partitions: %s", resourceName,
+              partitionsNeedLoadBalance));
     }
     if (!partitionsWithErrorStateReplica.isEmpty()) {
-      logger.info("Partition currently has an ERROR replica in {} partitions: 
{}", resourceName,
-          partitionsWithErrorStateReplica);
+      LogUtil.logInfo(logger, _eventId, String
+          .format("Partition currently has an ERROR replica in %s partitions: 
%s", resourceName,
+              partitionsWithErrorStateReplica));
     }
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
@@ -352,7 +358,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           intermediatePartitionStateMap);
     }
 
-    logger.debug("End processing resource: {}", resourceName);
+    LogUtil
+        .logDebug(logger, _eventId, String.format("End processing resource: 
%s", resourceName));
     return intermediatePartitionStateMap;
   }
 
@@ -474,7 +481,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           currentStateOutput, bestPossiblePartitionStateMap, 
partitionRecoveryBalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
     }
-    logger.info(String.format(
+    LogUtil.logInfo(logger, _eventId, String.format(
         "For resource %s: Num of partitions needing recovery: %d, Num of 
partitions needing recovery"
             + " but throttled (not recovered): %d",
         resourceName, partitionsNeedRecovery.size(), 
partitionRecoveryBalanceThrottled.size()));
@@ -542,7 +549,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           currentStateOutput, bestPossiblePartitionStateMap, 
partitionsLoadbalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
     }
-    logger.info(String.format(
+    LogUtil.logInfo(logger, _eventId, String.format(
         "For resource %s: Num of partitions needing load-balance: %d, Num of 
partitions needing"
             + " load-balance but throttled (not load-balanced): %d",
         resourceName, partitionsNeedLoadbalance.size(), 
partitionsLoadbalanceThrottled.size()));
@@ -578,8 +585,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     if (throttleController.shouldThrottleForResource(rebalanceType, 
resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        logger.debug("Throttled on partition: {} in resource: {}", 
partition.getPartitionName(),
-            resourceName);
+        LogUtil.logDebug(logger, _eventId, String
+            .format("Throttled on partition: %s in resource: %s", 
partition.getPartitionName(),
+                resourceName));
       }
     } else {
       // throttle if any of the instances are not able to accept state 
transitions
@@ -590,9 +598,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           if (throttleController.shouldThrottleForInstance(rebalanceType, 
instance)) {
             hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "Throttled because of instance: {} for partition: {} in 
resource: {}", instance,
-                  partition.getPartitionName(), resourceName);
+              LogUtil.logDebug(logger, _eventId, String
+                  .format("Throttled because of instance: %s for partition: %s 
in resource: %s",
+                      instance, partition.getPartitionName(), resourceName));
             }
             break;
           }
@@ -698,22 +706,25 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       PartitionStateMap intermediateStateMap) {
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Partitions need recovery: {}\nPartitions get throttled on 
recovery: {}",
-          recoveryPartitions, recoveryThrottledPartitions);
-      logger.debug("Partitions need loadbalance: {}\nPartitions get throttled 
on load-balance: {}",
-          loadbalancePartitions, loadbalanceThrottledPartitions);
+      LogUtil.logDebug(logger, _eventId, String
+          .format("Partitions need recovery: %s\nPartitions get throttled on 
recovery: %s",
+              recoveryPartitions, recoveryThrottledPartitions));
+      LogUtil.logDebug(logger, _eventId, String
+          .format("Partitions need loadbalance: %s\nPartitions get throttled 
on load-balance: %s",
+              loadbalancePartitions, loadbalanceThrottledPartitions));
     }
 
     for (Partition partition : allPartitions) {
       if (logger.isDebugEnabled()) {
-        logger.debug(partition + ": Best possible map: {}",
-            bestPossibleStateMap.getPartitionMap(partition));
-        logger.debug(partition + ": Current State: {}",
-            currentStateOutput.getCurrentStateMap(resource, partition));
-        logger.debug(partition + ": Pending state: {}",
-            currentStateOutput.getPendingMessageMap(resource, partition));
-        logger.debug(partition + ": Intermediate state: {}",
-            intermediateStateMap.getPartitionMap(partition));
+        LogUtil.logDebug(logger, _eventId, String
+            .format("%s : Best possible map: %s", partition,
+                bestPossibleStateMap.getPartitionMap(partition)));
+        LogUtil.logDebug(logger, _eventId, String.format("%s : Current State: 
%s", partition,
+            currentStateOutput.getCurrentStateMap(resource, partition)));
+        LogUtil.logDebug(logger, _eventId, String.format("%s: Pending state: 
%s", partition,
+            currentStateOutput.getPendingMessageMap(resource, partition)));
+        LogUtil.logDebug(logger, _eventId, String
+            .format("%s: Intermediate state: %s", partition, 
intermediateStateMap.getPartitionMap(partition)));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 5a4f039..e21c607 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -65,6 +66,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
@@ -92,7 +94,7 @@ public class MessageGenerationPhase extends AbstractBaseStage 
{
 
       StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
       if (stateModelDef == null) {
-        logger.error(
+        LogUtil.logError(logger, _eventId,
             "State Model Definition null, skip generating messages for 
resource: " + resourceName);
         continue;
       }
@@ -137,10 +139,10 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
 
           if (shouldCleanUpPendingMessage(pendingMessage, currentState,
               currentStateOutput.getEndTime(resourceName, partition, 
instanceName))) {
-            logger.info(
-                "Adding pending message {} on instance {} to clean up. Msg: 
{}->{}, current state of resource {}:{} is {}",
+            LogUtil.logInfo(logger, _eventId, String.format(
+                "Adding pending message %s on instance %s to clean up. Msg: 
%s->%s, current state of resource %s:%s is %s",
                 pendingMessage.getMsgId(), instanceName, 
pendingMessage.getFromState(),
-                pendingMessage.getToState(), resourceName, partition, 
currentState);
+                pendingMessage.getToState(), resourceName, partition, 
currentState));
             if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
                 pendingMessagesToCleanUp.put(instanceName, new HashMap<String, 
Message>());
             }
@@ -158,29 +160,31 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
             }
           } else {
             if (nextState == null) {
-              logger.error("Unable to find a next state for resource: " + 
resource.getResourceName()
-                  + " partition: " + partition.getPartitionName() + " from 
stateModelDefinition"
-                  + stateModelDef.getClass() + " from:" + currentState + " 
to:" + desiredState);
+              LogUtil.logError(logger, _eventId,
+                  "Unable to find a next state for resource: " + 
resource.getResourceName()
+                      + " partition: " + partition.getPartitionName() + " from 
stateModelDefinition"
+                      + stateModelDef.getClass() + " from:" + currentState + " 
to:" + desiredState);
               continue;
             }
 
             if (pendingMessage != null) {
               String pendingState = pendingMessage.getToState();
               if (nextState.equalsIgnoreCase(pendingState)) {
-                logger.debug(
+                LogUtil.logDebug(logger, _eventId,
                     "Message already exists for " + instanceName + " to 
transit " + resource
                         .getResourceName() + "." + 
partition.getPartitionName() + " from "
                         + currentState + " to " + nextState);
               } else if (currentState.equalsIgnoreCase(pendingState)) {
-                logger.info(
+                LogUtil.logInfo(logger, _eventId,
                     "Message hasn't been removed for " + instanceName + " to 
transit " + resource
                         .getResourceName() + "." + 
partition.getPartitionName() + " to "
                         + pendingState + ", desiredState: " + desiredState);
               } else {
-                logger.info("IdealState changed before state transition 
completes for " + resource
-                    .getResourceName() + "." + partition.getPartitionName() + 
" on " + instanceName
-                    + ", pendingState: " + pendingState + ", currentState: " + 
currentState
-                    + ", nextState: " + nextState);
+                LogUtil.logInfo(logger, _eventId,
+                    "IdealState changed before state transition completes for 
" + resource
+                        .getResourceName() + "." + 
partition.getPartitionName() + " on "
+                        + instanceName + ", pendingState: " + pendingState + 
", currentState: "
+                        + currentState + ", nextState: " + nextState);
 
                 message = createStateTransitionCancellationMessage(manager, 
resource,
                     partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
@@ -195,7 +199,7 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                       stateModelDef.getId());
 
               if (logger.isDebugEnabled()) {
-                logger.debug(String.format(
+                LogUtil.logDebug(logger, _eventId, String.format(
                     "Resource %s partition %s for instance %s with 
currentState %s and nextState %s",
                     resource, partition.getPartitionName(), instanceName, 
currentState, nextState));
               }
@@ -268,7 +272,8 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
             String instanceName = entry.getKey();
             for (Message msg : entry.getValue().values()) {
               if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(), 
instanceName))) {
-                logger.info("Deleted message {} from instance {}", 
msg.getMsgId(), instanceName);
+                LogUtil.logInfo(logger, _eventId, String
+                    .format("Deleted message %s from instance %s", 
msg.getMsgId(), instanceName));
               }
             }
           }
@@ -330,9 +335,10 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
       String currentState) {
 
     if (isCancellationEnabled && cancellationMessage == null) {
-      logger.info("Send cancellation message of the state transition for " + 
resource.getResourceName() + "."
-          + partitionName + " on " + instanceName + ", currentState: " + 
currentState + ", nextState: "
-          + (nextState == null ? "N/A" : nextState));
+      LogUtil.logInfo(logger, _eventId,
+          "Send cancellation message of the state transition for " + 
resource.getResourceName()
+              + "." + partitionName + " on " + instanceName + ", currentState: 
" + currentState
+              + ", nextState: " + (nextState == null ? "N/A" : nextState));
 
       String uuid = UUID.randomUUID().toString();
       Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, 
uuid);
@@ -382,7 +388,7 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
       try {
         timeout = Integer.parseInt(timeOutStr);
       } catch (Exception e) {
-        logger.error("", e);
+        LogUtil.logError(logger, _eventId, "", e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index d683899..a061598 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
@@ -57,6 +58,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
     CurrentStateOutput currentStateOutput =
@@ -200,7 +202,9 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
               }
             } else {
               // reach upper-bound of message for the topState, will not send 
the message
-              LOG.info("Reach upper_bound: " + 
stateConstraints.get(toState).getUpperBound() + ", not send message: " + 
message);
+              LogUtil.logInfo(LOG, _eventId,
+                  "Reach upper_bound: " + 
stateConstraints.get(toState).getUpperBound()
+                      + ", not send message: " + message);
             }
             continue;
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index e918cd7..c504199 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConstraints;
@@ -52,16 +53,18 @@ public class MessageThrottleStage extends AbstractBaseStage 
{
         value = Integer.MAX_VALUE;
         break;
       default:
-        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default 
value:"
-            + Integer.MAX_VALUE);
+        LogUtil.logError(LOG, _eventId,
+            "Invalid constraintValue token:" + valueStr + ". Use default 
value:"
+                + Integer.MAX_VALUE);
         break;
       }
     } catch (Exception e) {
       try {
         value = Integer.parseInt(valueStr);
       } catch (NumberFormatException ne) {
-        LOG.error("Invalid constraintValue string:" + valueStr + ". Use 
default value:"
-            + Integer.MAX_VALUE);
+        LogUtil.logError(LOG, _eventId,
+            "Invalid constraintValue string:" + valueStr + ". Use default 
value:"
+                + Integer.MAX_VALUE);
       }
     }
     return value;
@@ -113,6 +116,7 @@ public class MessageThrottleStage extends AbstractBaseStage 
{
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     MessageSelectionStageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -176,7 +180,8 @@ public class MessageThrottleStage extends AbstractBaseStage 
{
 
           if (LOG.isDebugEnabled()) {
             // TODO: printout constraint item that throttles the message
-            LOG.debug("message: " + message + " is throttled by constraint: " 
+ item);
+            LogUtil.logDebug(LOG, _eventId,
+                "message: " + message + " is throttled by constraint: " + 
item);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index ca83445..f909c9e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -30,10 +30,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -41,6 +39,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +77,8 @@ public class PersistAssignmentStage extends 
AbstractAsyncBaseStage {
       if (resource != null) {
         final IdealState idealState = cache.getIdealState(resourceId);
         if (idealState == null) {
-          LOG.warn("IdealState not found for resource " + resourceId);
+          LogUtil
+              .logWarn(LOG, event.getEventId(), "IdealState not found for 
resource " + resourceId);
           continue;
         }
         IdealState.RebalanceMode mode = idealState.getRebalanceMode();

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 036a913..2a9e4c3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConfig;
@@ -45,6 +46,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
@@ -66,7 +68,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
         @Override public Object call() {
           // Update the cluster status gauges
           if (clusterStatusMonitor != null) {
-            logger.debug("Update cluster status monitors");
+            LogUtil.logDebug(logger, _eventId, "Update cluster status 
monitors");
 
             Set<String> instanceSet = Sets.newHashSet();
             Set<String> liveInstanceSet = Sets.newHashSet();
@@ -97,7 +99,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
             clusterStatusMonitor
                 .setClusterInstanceStatus(liveInstanceSet, instanceSet, 
disabledInstanceSet,
                     disabledPartitions, oldDisabledPartitions, tags);
-            logger.debug("Complete cluster status monitors update.");
+            LogUtil.logDebug(logger, _eventId, "Complete cluster status 
monitors update.");
           }
           return null;
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 6c2e916..02a175a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Resource;
 import org.apache.helix.task.TaskConstants;
+import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". 
Requires DataCache");
@@ -142,9 +144,10 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
           }
 
           if (currentState.getStateModelDefRef() == null) {
-            LOG.error("state model def is null." + "resource:" + 
currentState.getResourceName()
-                + ", partitions: " + 
currentState.getPartitionStateMap().keySet() + ", states: "
-                + currentState.getPartitionStateMap().values());
+            LogUtil.logError(LOG, _eventId,
+                "state model def is null." + "resource:" + 
currentState.getResourceName()
+                    + ", partitions: " + 
currentState.getPartitionStateMap().keySet() + ", states: "
+                    + currentState.getPartitionStateMap().values());
             throw new StageException("State model def is null for resource:"
                 + currentState.getResourceName());
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index d60efe4..d4e76c6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.Map;
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
@@ -34,6 +35,7 @@ public class ResourceValidationStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". 
Requires DataCache");
@@ -60,7 +62,8 @@ public class ResourceValidationStage extends 
AbstractBaseStage {
           }
         }
         if (!hasMatchingRule) {
-          LOG.warn("Resource " + resourceName + " does not have a valid ideal 
state!");
+          LogUtil.logWarn(LOG, _eventId,
+              "Resource " + resourceName + " does not have a valid ideal 
state!");
           resourceMap.remove(resourceName);
         }
       }
@@ -69,8 +72,9 @@ public class ResourceValidationStage extends 
AbstractBaseStage {
       String stateModelDefRef = idealState.getStateModelDefRef();
       StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefRef);
       if (stateModelDef == null) {
-        LOG.warn("Resource " + resourceName + " uses state model " + 
stateModelDefRef
-            + ", but it is not on the cluster!");
+        LogUtil.logWarn(LOG, _eventId,
+            "Resource " + resourceName + " uses state model " + 
stateModelDefRef
+                + ", but it is not on the cluster!");
         resourceMap.remove(resourceName);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/092b3f10/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index d8ccd0f..c196a26 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.LiveInstance;
@@ -45,6 +46,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
     Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
@@ -84,7 +86,7 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     long cacheStart = System.currentTimeMillis();
     cache.cacheMessages(outputMessages);
     long cacheEnd = System.currentTimeMillis();
-    logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms");
+    LogUtil.logDebug(logger, _eventId, "Caching messages took " + (cacheEnd - 
cacheStart) + " ms");
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
@@ -139,17 +141,19 @@ public class TaskAssignmentStage extends 
AbstractBaseStage {
 
     List<PropertyKey> keys = new ArrayList<PropertyKey>();
     for (Message message : messages) {
-      logger.info(
+      LogUtil.logInfo(logger, _eventId,
           "Sending Message " + message.getMsgId() + " to " + 
message.getTgtName() + " transit "
               + message.getResourceName() + "." + message.getPartitionName() + 
"|" + message
               .getPartitionNames() + " from:" + message.getFromState() + " 
to:" + message
               .getToState() + ", relayMessages: " + 
message.getRelayMessages().size());
       if (message.hasRelayMessages()) {
         for (Message msg : message.getRelayMessages().values()) {
-          logger.info("Sending Relay Message " + msg.getMsgId() + " to " + 
msg.getTgtName() + " transit "
-              + msg.getResourceName() + "." + msg.getPartitionName() + "|" + 
msg.getPartitionNames() + " from:"
-              + msg.getFromState() + " to:" + msg.getToState() + ", relayFrom: 
" + msg.getRelaySrcHost()
-              + ", attached to message: " + message.getMsgId());
+          LogUtil.logInfo(logger, _eventId,
+              "Sending Relay Message " + msg.getMsgId() + " to " + 
msg.getTgtName() + " transit "
+                  + msg.getResourceName() + "." + msg.getPartitionName() + "|" 
+ msg
+                  .getPartitionNames() + " from:" + msg.getFromState() + " 
to:" + msg.getToState()
+                  + ", relayFrom: " + msg.getRelaySrcHost() + ", attached to 
message: " + message
+                  .getMsgId());
         }
       }
 

Reply via email to