rahulrane50 commented on code in PR #2344:
URL: https://github.com/apache/helix/pull/2344#discussion_r1088535288


##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,60 @@
 import org.slf4j.LoggerFactory;
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+  private class AsyncMissingTopStateMonitor extends Thread {
+    private final ConcurrentHashMap<String, Map<String, Long>> 
_missingTopStateResourceMap;
+    private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+    public AsyncMissingTopStateMonitor(ConcurrentHashMap<String, Map<String, 
Long>> missingTopStateResourceMap) {
+      _missingTopStateResourceMap = missingTopStateResourceMap;
+    }
+
+    public void setMissingTopStateDurationThreshold(long 
missingTopStateDurationThreshold) {
+      _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+    }
+
+    @Override
+    public void run() {
+      try {
+        synchronized (this) {
+          while (true) {
+            while (_missingTopStateResourceMap.size() == 0) {
+              this.wait();
+            }
+            for (Iterator<Map.Entry<String, Map<String, Long>>> 
resourcePartitionIt =
+                _missingTopStateResourceMap.entrySet().iterator(); 
resourcePartitionIt.hasNext(); ) {
+              Map.Entry<String, Map<String, Long>> resourcePartitionEntry = 
resourcePartitionIt.next();
+              // Iterate over all partitions and if any partition has missing 
top state greater than threshold then report
+              // it.
+              ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
+              // If all partitions of resource has top state recovered then 
reset the counter
+              if (resourcePartitionEntry.getValue().isEmpty()) {
+                resourceMonitor.resetMissingTopStateDurationGuage();
+                resourcePartitionIt.remove();
+              } else {
+                for (Long missingTopStateStartTime : 
resourcePartitionEntry.getValue().values()) {
+                  if (_missingTopStateDurationThreshold < Long.MAX_VALUE && 
System.currentTimeMillis() - missingTopStateStartTime > 
_missingTopStateDurationThreshold) {
+                    
resourceMonitor.updateMissingTopStateDurationGuage(System.currentTimeMillis() - 
missingTopStateStartTime);
+                  }
+                }
+
+              }
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.error("AsyncMissingTopStateMonitor has been interrupted.", e);
+      }
+    }
+
+    public void reset() {
+      for (String resource : _missingTopStateResourceMap.keySet()) {
+        ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource);

Review Comment:
   From comments above method getResourceMonitor() it says it's only for tests. 
I think it's because it's not synchronized like getOrCreateResourceMonitor().  
Anyways it doesn't create resource monitor everytime and hence used at all 
other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to