OneSizeFitsQuorum commented on code in PR #14866:
URL: https://github.com/apache/iotdb/pull/14866#discussion_r1971293325


##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DetectorTest {
+
+  final long sec = 1_000_000_000L;
+  final PhiAccrualDetector phiAccrualDetector =
+      new PhiAccrualDetector(35, 10 * sec, sec, (long) (0.2 * sec));
+  final FixedDetector fixedDetector = new FixedDetector(20 * sec);
+
+  private double getPhi(long elapsed, double[] intervals, long minStd, long 
pause) {
+    final PhiAccrualDetector.PhiAccrual p =
+        new PhiAccrualDetector.PhiAccrual(intervals, elapsed, minStd, pause);
+    return p.phi();
+  }
+
+  private void assertInRange(double value, double start, double end) {
+    Assert.assertTrue(value > start);
+    Assert.assertTrue(value < end);
+  }
+
+  @Test
+  public void testFixedDetector() {
+    final long lastHeartbeatTs = System.nanoTime() - 21 * sec;
+    final List<AbstractHeartbeatSample> history =
+        Collections.singletonList(new NodeHeartbeatSample(lastHeartbeatTs, 
NodeStatus.Unknown));
+    Assert.assertFalse(fixedDetector.isAvailable(history));
+  }
+
+  @Test
+  public void testPhiCalculation1() {
+    /* (min, std, acceptable_pause) = (1000, 200, 0) */
+    final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+    final long minStd = 200;
+    final long pause = 0;
+
+    assertInRange(getPhi(1000, heartbeatIntervals, minStd, pause), 0, 1);
+    assertInRange(getPhi(2000, heartbeatIntervals, minStd, pause), 5, 10);
+    assertInRange(getPhi(3000, heartbeatIntervals, minStd, pause), 35, 50);
+  }
+
+  @Test
+  public void testPhiCalculation2() {
+    /* (min, std, acceptable_pause) = (1000, 300, 0) */
+    /* (min, std, acceptable_pause) = (1000, 300, 5000) */

Review Comment:
   where is 5000?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.tsfile.utils.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PhiAccrualDetector implements IFailureDetector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhiAccrualDetector.class);
+  private final long threshold;
+  private final long acceptableHeartbeatPauseNs;
+  private final long firstHeartbeatEstimateNs;
+  private final long minHeartbeatStdNs;
+
+  public PhiAccrualDetector(
+      long threshold,
+      long acceptableHeartbeatPauseNs,
+      long firstHeartbeatEstimateNs,
+      long minHeartbeatStdNs) {
+    this.threshold = threshold;
+    this.acceptableHeartbeatPauseNs = acceptableHeartbeatPauseNs;
+    this.firstHeartbeatEstimateNs = firstHeartbeatEstimateNs;
+    this.minHeartbeatStdNs = minHeartbeatStdNs;
+  }
+
+  @Override
+  public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+    if (history.isEmpty()) {
+      /* We haven't received the first heartbeat reply. We cannot decide the 
node availability. */
+      return true;
+    }
+    final PhiAccrual phiAccrual = create(history);
+    final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
+    if (!isAvailable) {
+      // log the status change and dump the heartbeat history for analysis use
+      final StringBuilder builder = new StringBuilder();
+      builder.append("[");
+      for (double interval : phiAccrual.heartbeatIntervals) {
+        final long msInterval = (long) interval / 1000_000;
+        builder.append(msInterval).append(", ");
+      }
+      builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+      builder.append("]");
+      LOGGER.info(String.format("Node Down, heartbeat history (ms): %s", 
builder));
+    }
+
+    return isAvailable;
+  }
+
+  PhiAccrual create(List<AbstractHeartbeatSample> history) {
+    final int size = history.size();
+
+    final List<Double> heartbeatIntervals = new ArrayList<>();
+    /*
+     * During cold start, the heartbeat history may contain not enough samples 
for wise decisions.
+     * Therefore, we manually add data samples for better estimation.
+     * 1. mean = heartbeat interval (1000ms)
+     * 2. std = mean / 4
+     */
+    if (size <= 2) {

Review Comment:
   try to make the cold start process smoother



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java:
##########
@@ -44,8 +44,7 @@ public synchronized void updateCurrentStatistics(boolean 
forceUpdate) {
     long currentNanoTime = System.nanoTime();
     if (lastSample == null) {
       status = RegionStatus.Unknown;
-    } else if (currentNanoTime - lastSample.getSampleLogicalTimestamp()
-        > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
+    } else if (currentNanoTime - lastSample.getSampleLogicalTimestamp() > 
heartbeatTimeoutNs) {

Review Comment:
   try to use same detector



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java:
##########
@@ -33,16 +38,35 @@ public abstract class AbstractLoadCache {
   // Max heartbeat cache samples store size
   private static final int MAXIMUM_WINDOW_SIZE = 100;
   // The Status will be set to Unknown when the response time of heartbeat is 
more than 20s
-  protected static final long HEARTBEAT_TIMEOUT_TIME_IN_NS = 20_000_000_000L;
+  protected final long heartbeatTimeoutNs;
 
   // Caching the recent MAXIMUM_WINDOW_SIZE heartbeat sample
   protected final List<AbstractHeartbeatSample> slidingWindow;
   // The current statistics calculated by the latest heartbeat sample
   protected final AtomicReference<AbstractStatistics> currentStatistics;
 
+  protected final IFailureDetector failureDetector;
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
   protected AbstractLoadCache() {
     this.currentStatistics = new AtomicReference<>();
     this.slidingWindow = Collections.synchronizedList(new LinkedList<>());
+    this.heartbeatTimeoutNs = CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L;
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 1000_000L,
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L);

Review Comment:
   too big for std???



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -684,6 +684,26 @@ time_partition_interval=604800000
 # Datatype: long
 heartbeat_interval_in_ms=1000
 
+# Default failure detector, enum from {fixed, phi_accrual}
+# effectiveMode: restart
+# Datatype: string
+failure_detector=fixed
+
+# Fixed failure detector threshold of time elapsed without receiving heartbeat 
replies
+# effectiveMode: restart
+# Datatype: long
+failure_detector_fixed_threshold_in_ms=20000
+
+# Phi accrual: threshold to determine a node is down
+# effectiveMode: restart
+# Datatype: long
+failure_detector_phi_threshold=30

Review Comment:
   Got it



##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DetectorTest {

Review Comment:
   add cold start test



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java:
##########
@@ -40,9 +45,27 @@ public abstract class AbstractLoadCache {
   // The current statistics calculated by the latest heartbeat sample

Review Comment:
   For the regionCache, it's logical clock used nanotime too. It's best to 
unify them, or you might end up with a strange state where the node is unknown 
but the region is running



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java:
##########
@@ -54,8 +56,21 @@ public boolean isAvailable(List<AbstractHeartbeatSample> 
history) {
       return true;
     }
     final PhiAccrual phiAccrual = create(history);
-    // TODO(szywilliam) we should log the status change and dump the heartbeat 
history
-    return phiAccrual.phi() < (double) this.threshold;
+    final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
+    if (!isAvailable) {
+      // log the status change and dump the heartbeat history for analysis use
+      final StringBuilder builder = new StringBuilder();
+      builder.append("[");
+      for (double interval : phiAccrual.heartbeatIntervals) {
+        final long msInterval = (long) interval / 1000_000;
+        builder.append(msInterval).append(", ");
+      }
+      builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+      builder.append("]");
+      LOGGER.info(String.format("Node Down, heartbeat history (ms): %s", 
builder));
+    }
+

Review Comment:
   remove empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to