pkuwm commented on a change in pull request #1187:
URL: https://github.com/apache/helix/pull/1187#discussion_r464040813



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
##########
@@ -24,32 +24,63 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.Partition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class CustomizedStateOutput {
+  private static Logger LOG = 
LoggerFactory.getLogger(CustomizedStateOutput.class);

Review comment:
       `static final`?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
##########
@@ -24,32 +24,63 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.Partition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class CustomizedStateOutput {
+  private static Logger LOG = 
LoggerFactory.getLogger(CustomizedStateOutput.class);
+
   // stateType -> (resourceName -> (Partition -> (instanceName -> 
customizedState)))
   private final Map<String, Map<String, Map<Partition, Map<String, String>>>> 
_customizedStateMap;
+  // stateType -> (resourceName -> (Partition -> (instanceName -> startTime)))
+  private final Map<String, Map<String, Map<Partition, Map<String, String>>>> 
_startTimeMap;
 
   public CustomizedStateOutput() {
     _customizedStateMap = new HashMap<>();
+    _startTimeMap = new HashMap<>();
   }
 
   public void setCustomizedState(String stateType, String resourceName, 
Partition partition,
       String instanceName, String state) {
-    if (!_customizedStateMap.containsKey(stateType)) {
-      _customizedStateMap
-          .put(stateType, new HashMap<String, Map<Partition, Map<String, 
String>>>());
+    
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty.CURRENT_STATE,
 stateType,
+        resourceName, partition, instanceName, state);
+  }
+
+  public void setStartTime(String stateType, String resourceName, Partition 
partition,
+      String instanceName, String state) {
+    
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty.START_TIME, 
stateType,
+        resourceName, partition, instanceName, state);
+  }
+
+  private void 
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty propertyName,
+      String stateType, String resourceName, Partition partition, String 
instanceName,
+      String state) {
+    Map<String, Map<String, Map<Partition, Map<String, String>>>> mapToUpdate;
+    switch (propertyName) {
+      case CURRENT_STATE:
+        mapToUpdate = _customizedStateMap;
+        break;
+      case START_TIME:
+        mapToUpdate = _startTimeMap;
+        break;
+      default:
+        LOG.error(
+            "The customized state property is not supported, could not update 
customized state output.");
+        return;
+    }
+    if (!mapToUpdate.containsKey(stateType)) {
+      mapToUpdate.put(stateType, new HashMap<String, Map<Partition, 
Map<String, String>>>());
     }
-    if (!_customizedStateMap.get(stateType).containsKey(resourceName)) {
-      _customizedStateMap.get(stateType)
-          .put(resourceName, new HashMap<Partition, Map<String, String>>());
+    if (!mapToUpdate.get(stateType).containsKey(resourceName)) {
+      mapToUpdate.get(stateType).put(resourceName, new HashMap<Partition, 
Map<String, String>>());
     }
-    if 
(!_customizedStateMap.get(stateType).get(resourceName).containsKey(partition)) {
-      _customizedStateMap.get(stateType).get(resourceName)
-          .put(partition, new HashMap<String, String>());
+    if (!mapToUpdate.get(stateType).get(resourceName).containsKey(partition)) {
+      mapToUpdate.get(stateType).get(resourceName).put(partition, new 
HashMap<String, String>());
     }
-    
_customizedStateMap.get(stateType).get(resourceName).get(partition).put(instanceName,
 state);
+    
mapToUpdate.get(stateType).get(resourceName).get(partition).put(instanceName, 
state);

Review comment:
       We could make it simpler and cleaner by adopting `computeIfAbsent`:
   ```
   mapToUpdate.computeIfAbsent(stateType, k -> new HashMap<>())
     .computeIfAbsent(resourceName, k -> new HashMap<>())
     .computeIfAbsent(partition, k -> new HashMap<>())
     .put(instanceName, state);
   ```

##########
File path: 
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
##########
@@ -0,0 +1,190 @@
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.collections.Lists;
+
+
+public class CustomizedViewMonitor extends DynamicMBeanProvider {
+  private static Logger LOG = 
LoggerFactory.getLogger(CustomizedViewMonitor.class);

Review comment:
       `static final`

##########
File path: 
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
##########
@@ -0,0 +1,190 @@
+package org.apache.helix.monitoring.mbeans;
+

Review comment:
       Apache License?

##########
File path: 
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
##########
@@ -0,0 +1,190 @@
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.collections.Lists;
+

Review comment:
       The latest helix-style doesn't format two empty lines. Maybe good to 
update helix-style in your IntelliJ.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
##########
@@ -24,32 +24,63 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.Partition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class CustomizedStateOutput {
+  private static Logger LOG = 
LoggerFactory.getLogger(CustomizedStateOutput.class);
+
   // stateType -> (resourceName -> (Partition -> (instanceName -> 
customizedState)))
   private final Map<String, Map<String, Map<Partition, Map<String, String>>>> 
_customizedStateMap;
+  // stateType -> (resourceName -> (Partition -> (instanceName -> startTime)))
+  private final Map<String, Map<String, Map<Partition, Map<String, String>>>> 
_startTimeMap;
 
   public CustomizedStateOutput() {
     _customizedStateMap = new HashMap<>();
+    _startTimeMap = new HashMap<>();
   }
 
   public void setCustomizedState(String stateType, String resourceName, 
Partition partition,
       String instanceName, String state) {
-    if (!_customizedStateMap.containsKey(stateType)) {
-      _customizedStateMap
-          .put(stateType, new HashMap<String, Map<Partition, Map<String, 
String>>>());
+    
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty.CURRENT_STATE,
 stateType,
+        resourceName, partition, instanceName, state);
+  }
+
+  public void setStartTime(String stateType, String resourceName, Partition 
partition,
+      String instanceName, String state) {
+    
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty.START_TIME, 
stateType,
+        resourceName, partition, instanceName, state);
+  }
+
+  private void 
setCustomizedStateProperty(CustomizedState.CustomizedStateProperty propertyName,
+      String stateType, String resourceName, Partition partition, String 
instanceName,
+      String state) {
+    Map<String, Map<String, Map<Partition, Map<String, String>>>> mapToUpdate;
+    switch (propertyName) {
+      case CURRENT_STATE:
+        mapToUpdate = _customizedStateMap;
+        break;
+      case START_TIME:
+        mapToUpdate = _startTimeMap;
+        break;
+      default:
+        LOG.error(

Review comment:
       Often, the only use for the ERROR level within a certain application is 
when a valuable business use case cannot be completed due to technical issues 
or a bug. 
   I'd prefer not to use this logging level too generously because that would 
add too much noise to the logs and reduce the significance of a single ERROR 
event.
   
   The WARN level should be used when something bad happened, but the 
application still has the chance to heal itself or the issue can wait a day or 
two to be fixed.

##########
File path: 
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
##########
@@ -0,0 +1,190 @@
+package org.apache.helix.monitoring.mbeans;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.collections.Lists;
+
+
+public class CustomizedViewMonitor extends DynamicMBeanProvider {
+  private static Logger LOG = 
LoggerFactory.getLogger(CustomizedViewMonitor.class);
+
+  private static final String MBEAN_DESCRIPTION = "Helix Customized View 
Aggregation Monitor";
+  private final String _clusterName;
+  private final String _sensorName;
+  private HistogramDynamicMetric _updateToAggregationLatencyGauge;
+
+  public CustomizedViewMonitor(String clusterName) {
+    _clusterName = clusterName;
+    _sensorName =
+        String.format("%s.%s", MonitorDomainNames.RoutingTableProvider.name(), 
_clusterName);
+    _updateToAggregationLatencyGauge = new 
HistogramDynamicMetric("UpdateToAggregationLatencyGauge",
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), 
TimeUnit.MILLISECONDS)));
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_updateToAggregationLatencyGauge);
+    doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName());
+    return this;
+  }
+
+  private ObjectName getMBeanName() throws MalformedObjectNameException {
+    return new ObjectName(String
+        .format("%s:%s=%s", MonitorDomainNames.CustomizedView.name(), 
"Cluster", _clusterName));
+  }
+
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
+
+  private void recordUpdateToAggregationLatency(long latency) {
+    if (_updateToAggregationLatencyGauge != null) {
+      _updateToAggregationLatencyGauge.updateValue(latency);
+    }
+  }
+
+  /**
+   * Find updated customized states and report the aggregation latency of each 
customized state
+   * @param updatedCustomizedViews Customized views that have been updated, 
obtained from CustomizedStateOutput
+   * @param curCustomizedViews Current customized view values from the 
CustomizedViewCache
+   * @param updatedStartTimestamps All customized state START_TIME property 
values from CustomizedStateOutput
+   * @param updateSuccess If the customized view update to ZK is successful or 
not
+   * @param endTime The timestamp when the new customized view is updated to ZK
+   */
+  public void reportLatency(List<CustomizedView> updatedCustomizedViews,

Review comment:
       It seems we need to consider concurrency?

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
##########
@@ -152,6 +161,31 @@ private void computeCustomizedStateView(final Resource 
resource, final String st
     if (curCustomizedView == null || 
!curCustomizedView.getRecord().equals(view.getRecord())) {
       // Add customized view to the list which will be written to ZK later.
       updatedCustomizedViews.add(view);
+      updatedStartTimestamps.put(resourceName,
+          customizedStateOutput.getResourceStartTimeMap(stateType, 
resourceName));
     }
   }
+
+  private CustomizedViewMonitor getOrCreateMonitor(ClusterEvent event) {
+    String clusterName = event.getClusterName();
+    if (_monitors.get(clusterName) == null) {
+      _monitors.put(clusterName, new CustomizedViewMonitor(clusterName));
+    }
+    return _monitors.get(clusterName);
+  }
+
+  private void asyncReportLatency(ExecutorService threadPool, 
CustomizedViewMonitor monitor,
+      List<CustomizedView> updatedCustomizedViews, Map<String, CustomizedView> 
curCustomizedViews,
+      Map<String, Map<Partition, Map<String, String>>> updatedStartTimestamps,
+      boolean[] updateSuccess, long curTime) {
+    AbstractBaseStage.asyncExecute(threadPool, () -> {
+      try {
+        monitor.reportLatency(updatedCustomizedViews, curCustomizedViews, 
updatedStartTimestamps,
+            updateSuccess, curTime);
+      } catch (Exception e) {
+        LOG.error("Failed to report UpdateToAggregationLatency metric.", e);

Review comment:
       I suggest `warn` level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to