desaikomal commented on code in PR #2344:
URL: https://github.com/apache/helix/pull/2344#discussion_r1088396600


##########
helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java:
##########
@@ -85,6 +85,9 @@ private void 
updateTopStateStatus(ResourceControllerDataProvider cache,
     if (cache.getClusterConfig() != null) {
       durationThreshold = 
cache.getClusterConfig().getMissTopStateDurationThreshold();
     }
+    if (clusterStatusMonitor != null) {
+      
clusterStatusMonitor.updateMissingTopStateDurationThreshold(durationThreshold);

Review Comment:
   if durationThreshold is Long.MAX-VALUE, our monitoring will never finish.. 
thread will remain spin with no-action.. 



##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,60 @@
 import org.slf4j.LoggerFactory;
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+  private class AsyncMissingTopStateMonitor extends Thread {
+    private final ConcurrentHashMap<String, Map<String, Long>> 
_missingTopStateResourceMap;
+    private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+    public AsyncMissingTopStateMonitor(ConcurrentHashMap<String, Map<String, 
Long>> missingTopStateResourceMap) {
+      _missingTopStateResourceMap = missingTopStateResourceMap;
+    }
+
+    public void setMissingTopStateDurationThreshold(long 
missingTopStateDurationThreshold) {
+      _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (this) {

Review Comment:
   i am not sure if i understand this. you have the whole object in 
synchronized block and the block has while (true).. constantly processing, so 
how can any other thread enter?



##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -583,6 +643,31 @@ public void updateMissingTopStateDurationStats(String 
resourceName, long totalDu
     }
   }
 
+  public void updateMissingTopStateDurationThreshold(long 
missingTopStateDurationThreshold) {
+    
_asyncMissingTopStateMonitor.setMissingTopStateDurationThreshold(missingTopStateDurationThreshold);

Review Comment:
   now this is where i have question: since your _asyncMissingTopStateMonitor 
has synchronized and is in tight-loop, will this method go through? May be when 
the method is in "wait" state, it releases the lock and so this method can go 
through



##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java:
##########
@@ -139,6 +142,10 @@ public ResourceMonitor(String clusterName, String 
resourceName, ObjectName objec
     _partitionTopStateNonGracefulHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateNonGracefulHandoffGauge", 
new Histogram(
             new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), 
TimeUnit.MILLISECONDS)));
+    _missingTopStateDurationGuage =
+        new HistogramDynamicMetric("MissingTopStateDurationGauge", new 
Histogram(
+            new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), 
TimeUnit.MILLISECONDS)));

Review Comment:
   can this be your sleep duration?



##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,60 @@
 import org.slf4j.LoggerFactory;
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+  private class AsyncMissingTopStateMonitor extends Thread {
+    private final ConcurrentHashMap<String, Map<String, Long>> 
_missingTopStateResourceMap;
+    private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+    public AsyncMissingTopStateMonitor(ConcurrentHashMap<String, Map<String, 
Long>> missingTopStateResourceMap) {
+      _missingTopStateResourceMap = missingTopStateResourceMap;
+    }
+
+    public void setMissingTopStateDurationThreshold(long 
missingTopStateDurationThreshold) {
+      _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (this) {
+          while (true) {
+            while (_missingTopStateResourceMap.size() == 0) {
+              this.wait();
+            }
+            for (Iterator<Map.Entry<String, Map<String, Long>>> 
resourcePartitionIt =
+                _missingTopStateResourceMap.entrySet().iterator(); 
resourcePartitionIt.hasNext(); ) {
+              Map.Entry<String, Map<String, Long>> resourcePartitionEntry = 
resourcePartitionIt.next();
+              // Iterate over all partitions and if any partition has missing 
top state greater than threshold then report
+              // it.
+              ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
+              // If all partitions of resource has top state recovered then 
reset the counter
+              if (resourcePartitionEntry.getValue().isEmpty()) {
+                resourceMonitor.resetMissingTopStateDurationGuage();
+                resourcePartitionIt.remove();

Review Comment:
   when you remove the entry from the map on which you are already iterating, 
is it ok?
   in C++, i know that we can't have iterator and remove from iterator in same 
loop as "iterator" will need to reassign.
   Please check.



##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,60 @@
 import org.slf4j.LoggerFactory;
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+  private class AsyncMissingTopStateMonitor extends Thread {
+    private final ConcurrentHashMap<String, Map<String, Long>> 
_missingTopStateResourceMap;
+    private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+    public AsyncMissingTopStateMonitor(ConcurrentHashMap<String, Map<String, 
Long>> missingTopStateResourceMap) {
+      _missingTopStateResourceMap = missingTopStateResourceMap;
+    }
+
+    public void setMissingTopStateDurationThreshold(long 
missingTopStateDurationThreshold) {
+      _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (this) {
+          while (true) {
+            while (_missingTopStateResourceMap.size() == 0) {
+              this.wait();
+            }
+            for (Iterator<Map.Entry<String, Map<String, Long>>> 
resourcePartitionIt =
+                _missingTopStateResourceMap.entrySet().iterator(); 
resourcePartitionIt.hasNext(); ) {
+              Map.Entry<String, Map<String, Long>> resourcePartitionEntry = 
resourcePartitionIt.next();
+              // Iterate over all partitions and if any partition has missing 
top state greater than threshold then report
+              // it.
+              ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
+              // If all partitions of resource has top state recovered then 
reset the counter
+              if (resourcePartitionEntry.getValue().isEmpty()) {
+                resourceMonitor.resetMissingTopStateDurationGuage();
+                resourcePartitionIt.remove();
+              } else {
+                for (Long missingTopStateStartTime : 
resourcePartitionEntry.getValue().values()) {
+                  if (_missingTopStateDurationThreshold < Long.MAX_VALUE && 
System.currentTimeMillis() - missingTopStateStartTime > 
_missingTopStateDurationThreshold) {
+                    
resourceMonitor.updateMissingTopStateDurationGuage(System.currentTimeMillis() - 
missingTopStateStartTime);
+                  }
+                }
+
+              }
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.error("AsyncMissingTopStateMonitor has been interrupted.", e);
+      }
+    }
+
+    public void reset() {
+      for (String resource : _missingTopStateResourceMap.keySet()) {
+        ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource);

Review Comment:
   can you not just use getResourceMonitor() as you don't want to create 
resource monitor if you are in reset path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to