junkaixue commented on code in PR #2344:
URL: https://github.com/apache/helix/pull/2344#discussion_r1083067900
##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,54 @@
import org.slf4j.LoggerFactory;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+ private class AsyncMissingTopStateMonitor extends Thread {
+ private final Map<String, Map<String, Long>> _missingTopStateResourceMap;
+ private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+ public AsyncMissingTopStateMonitor(Map<String, Map<String, Long>>
missingTopStateResourceMap) {
+ _missingTopStateResourceMap = missingTopStateResourceMap;
+ }
+
+ public void setMissingTopStateDurationThreshold(long
missingTopStateDurationThreshold) {
+ _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+ }
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (true) {
+ while (_missingTopStateResourceMap.size() == 0) {
+ this.wait();
Review Comment:
this wait here and here is the wake up process? Would you like to use
conditional variable instead?
##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java:
##########
@@ -335,12 +340,21 @@ private void resetResourceStateGauges() {
_numOfPartitionsInExternalView.updateValue(0L);
_numLessMinActiveReplicaPartitions.updateValue(0L);
_numLessReplicaPartitions.updateValue(0L);
+ _oneOrManyPartitionsMissingTopStateRealTimeGuage.updateValue(0L);
Review Comment:
This is very long name. Also it already represents how many partitions
missing as aggregation. Better be: _numPartitionsMissingTopStateRealTime
##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,54 @@
import org.slf4j.LoggerFactory;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+ private class AsyncMissingTopStateMonitor extends Thread {
+ private final Map<String, Map<String, Long>> _missingTopStateResourceMap;
+ private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+ public AsyncMissingTopStateMonitor(Map<String, Map<String, Long>>
missingTopStateResourceMap) {
+ _missingTopStateResourceMap = missingTopStateResourceMap;
+ }
+
+ public void setMissingTopStateDurationThreshold(long
missingTopStateDurationThreshold) {
+ _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+ }
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (true) {
+ while (_missingTopStateResourceMap.size() == 0) {
+ this.wait();
+ }
+ for (Iterator<Map.Entry<String, Map<String, Long>>>
resourcePartitionIt =
+ _missingTopStateResourceMap.entrySet().iterator();
resourcePartitionIt.hasNext(); ) {
+ Map.Entry<String, Map<String, Long>> resourcePartitionEntry =
resourcePartitionIt.next();
+ // Iterate over all partitions and if any partition has missing
top state greater than threshold then report
+ // it.
+ ResourceMonitor resourceMonitor =
getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
+ // If all partitions of resource has top state recovered then
reset the counter
+ if (resourcePartitionEntry.getValue().isEmpty()) {
+
resourceMonitor.resetOneOrManyPartitionsMissingTopStateRealTimeGuage();
+ resourcePartitionIt.remove();
+ } else {
+ for (Long missingTopStateStartTime :
resourcePartitionEntry.getValue().values()) {
+ if (_missingTopStateDurationThreshold < Long.MAX_VALUE &&
System.currentTimeMillis() - missingTopStateStartTime >
_missingTopStateDurationThreshold) {
+
resourceMonitor.updateOneOrManyPartitionsMissingTopStateRealTimeGuage();
+ }
+ }
+ }
+ }
+ // TODO: Check if this SLEEP_TIME is correct? Thread should keep
on increasing the counter continuously until top
+ // state is recovered but it can sleep for reasonable amount of
time in between.
+ sleep(100);
Review Comment:
As I mentioned, instead of using sleep, maybe it would be better to use
conditional variable.
##########
helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java:
##########
@@ -319,12 +322,17 @@ private void
reportTopStateHandoffFailIfNecessary(ResourceControllerDataProvider
String partitionName = partition.getPartitionName();
MissingTopStateRecord record =
missingTopStateMap.get(resourceName).get(partitionName);
long startTime = record.getStartTimeStamp();
- if (startTime > 0 && System.currentTimeMillis() - startTime >
durationThreshold && !record
+ if (startTime > 0 && !record
.isFailed()) {
Review Comment:
+1. If this is refactoring. Better do it in a different PR without logic
change. Would be easy for reviewing.
##########
helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java:
##########
@@ -55,6 +55,54 @@
import org.slf4j.LoggerFactory;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
+ private class AsyncMissingTopStateMonitor extends Thread {
+ private final Map<String, Map<String, Long>> _missingTopStateResourceMap;
+ private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;
+
+ public AsyncMissingTopStateMonitor(Map<String, Map<String, Long>>
missingTopStateResourceMap) {
+ _missingTopStateResourceMap = missingTopStateResourceMap;
+ }
+
+ public void setMissingTopStateDurationThreshold(long
missingTopStateDurationThreshold) {
+ _missingTopStateDurationThreshold = missingTopStateDurationThreshold;
+ }
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (true) {
+ while (_missingTopStateResourceMap.size() == 0) {
+ this.wait();
+ }
+ for (Iterator<Map.Entry<String, Map<String, Long>>>
resourcePartitionIt =
+ _missingTopStateResourceMap.entrySet().iterator();
resourcePartitionIt.hasNext(); ) {
+ Map.Entry<String, Map<String, Long>> resourcePartitionEntry =
resourcePartitionIt.next();
+ // Iterate over all partitions and if any partition has missing
top state greater than threshold then report
+ // it.
+ ResourceMonitor resourceMonitor =
getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
+ // If all partitions of resource has top state recovered then
reset the counter
+ if (resourcePartitionEntry.getValue().isEmpty()) {
+
resourceMonitor.resetOneOrManyPartitionsMissingTopStateRealTimeGuage();
+ resourcePartitionIt.remove();
+ } else {
+ for (Long missingTopStateStartTime :
resourcePartitionEntry.getValue().values()) {
+ if (_missingTopStateDurationThreshold < Long.MAX_VALUE &&
System.currentTimeMillis() - missingTopStateStartTime >
_missingTopStateDurationThreshold) {
Review Comment:
You can use Java parallel compute lamda feature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]