This is an automated email from the ASF dual-hosted git repository.

cyyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c0fecaa62 [improve] Added simple exponential backoff strategy (#3860)
2c0fecaa62 is described below

commit 2c0fecaa6262606fe00c7a9ee8bde96d23e03e76
Author: Duansg <[email protected]>
AuthorDate: Fri Nov 21 16:28:21 2025 +0800

    [improve] Added simple exponential backoff strategy (#3860)
    
    Signed-off-by: Duansg <[email protected]>
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: Yang Chen <[email protected]>
---
 .../realtime/MetricsRealTimeAlertCalculator.java   | 28 ++++++---
 .../WindowedLogRealTimeAlertCalculator.java        | 19 ++++--
 .../hertzbeat/common/config/CommonProperties.java  |  5 ++
 .../common/queue/impl/KafkaCommonDataQueue.java    |  2 +
 .../common/queue/impl/RedisCommonDataQueue.java    |  9 ++-
 .../exception/CommonDataQueueUnknownException.java | 39 +++++++++++++
 .../apache/hertzbeat/common/util/BackoffUtils.java | 50 ++++++++++++++++
 .../hertzbeat/common/util/ExponentialBackoff.java  | 66 +++++++++++++++++++++
 .../queue/impl/RedisCommonDataQueueTest.java       |  2 +-
 .../hertzbeat/common/util/BackoffUtilsTest.java    | 68 ++++++++++++++++++++++
 .../common/util/ExponentialBackoffTest.java        | 54 +++++++++++++++++
 .../component/sd/ServiceDiscoveryWorker.java       | 24 +++++---
 .../warehouse/store/DataStorageDispatch.java       | 18 +++++-
 13 files changed, 360 insertions(+), 24 deletions(-)

diff --git 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
index 5fe19edd04..4597760502 100644
--- 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
+++ 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
@@ -17,15 +17,6 @@
 
 package org.apache.hertzbeat.alert.calculate.realtime;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hertzbeat.alert.AlerterWorkerPool;
@@ -41,11 +32,24 @@ import 
org.apache.hertzbeat.common.entity.alerter.AlertDefine;
 import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
 import org.apache.hertzbeat.common.util.CommonUtil;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 /**
  * Calculate alarms based on the alarm definition rules and collected data
  */
@@ -122,16 +126,22 @@ public class MetricsRealTimeAlertCalculator {
      */
     public void startCalculate() {
         Runnable runnable = () -> {
+            ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
             while (!Thread.currentThread().isInterrupted()) {
                 try {
                     CollectRep.MetricsData metricsData = 
dataQueue.pollMetricsDataToAlerter();
                     if (metricsData == null) {
                         continue;
                     }
+                    backoff.reset();
                     calculate(metricsData);
                     dataQueue.sendMetricsDataToStorage(metricsData);
                 } catch (InterruptedException ignored) {
                     Thread.currentThread().interrupt();
+                } catch (CommonDataQueueUnknownException ue) {
+                    if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+                        break;
+                    }
                 } catch (Exception e) {
                     log.error("calculate alarm error: {}.", e.getMessage(), e);
                 }
diff --git 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
index 8d93007838..90d12be6c8 100644
--- 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
+++ 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
@@ -17,13 +17,17 @@
 
 package org.apache.hertzbeat.alert.calculate.realtime;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.alert.calculate.realtime.window.LogWorker;
 import org.apache.hertzbeat.alert.calculate.realtime.window.TimeService;
 import org.apache.hertzbeat.common.entity.log.LogEntry;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
 import org.springframework.stereotype.Component;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,16 +62,23 @@ public class WindowedLogRealTimeAlertCalculator implements 
Runnable {
 
     @Override
     public void run() {
+        ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
         while (!Thread.currentThread().isInterrupted()) {
             try {
                 LogEntry logEntry = dataQueue.pollLogEntry();
-                if (logEntry != null) {
-                    processLogEntry(logEntry);
-                    dataQueue.sendLogEntryToStorage(logEntry);
+                if (logEntry == null) {
+                    continue;
                 }
+                backoff.reset();
+                processLogEntry(logEntry);
+                dataQueue.sendLogEntryToStorage(logEntry);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 break;
+            } catch (CommonDataQueueUnknownException ue) {
+                if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+                    break;
+                }
             } catch (Exception e) {
                 log.error("Error in log dispatch loop: {}", e.getMessage(), e);
             }
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
index a70776f5fa..b88cc9c628 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
@@ -125,6 +125,11 @@ public class CommonProperties {
          */
         private String logEntryToStorageQueueName;
 
+        /**
+         * Timeout for blocking wait in seconds (defaults to 1 second if not 
configured)
+         */
+        private Long waitTimeout;
+
     }
 
     /**
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
index 96d6b42eb9..a9286de13e 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
@@ -33,6 +33,7 @@ import 
org.apache.hertzbeat.common.serialize.KafkaLogEntryDeserializer;
 import org.apache.hertzbeat.common.serialize.KafkaLogEntrySerializer;
 import org.apache.hertzbeat.common.serialize.KafkaMetricsDataDeserializer;
 import org.apache.hertzbeat.common.serialize.KafkaMetricsDataSerializer;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -176,6 +177,7 @@ public class KafkaCommonDataQueue implements 
CommonDataQueue, DisposableBean {
             dataConsumer.commitAsync();
         } catch (Exception e) {
             log.error(e.getMessage());
+            throw new CommonDataQueueUnknownException(e.getMessage(), e);
         } finally {
             lock.unlock();
         }
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
index ba69b25242..7f82f6372b 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
@@ -30,10 +30,13 @@ import 
org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
 import org.apache.hertzbeat.common.serialize.RedisLogEntryCodec;
 import org.apache.hertzbeat.common.serialize.RedisMetricsDataCodec;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Objects;
+
 /**
  * common data queue implement redis.
  */
@@ -57,6 +60,7 @@ public class RedisCommonDataQueue implements CommonDataQueue, 
DisposableBean {
     private final String logEntryQueueName;
     private final String logEntryToStorageQueueName;
     private final CommonProperties.RedisProperties redisProperties;
+    private final Long waitTimeout;
 
     public RedisCommonDataQueue(CommonProperties properties) {
 
@@ -84,6 +88,7 @@ public class RedisCommonDataQueue implements CommonDataQueue, 
DisposableBean {
         this.metricsDataQueueNameToAlerter = 
redisProperties.getMetricsDataQueueNameToAlerter();
         this.logEntryQueueName = redisProperties.getLogEntryQueueName();
         this.logEntryToStorageQueueName = 
redisProperties.getLogEntryToStorageQueueName();
+        this.waitTimeout = 
Objects.requireNonNullElse(redisProperties.getWaitTimeout(), 1L);
     }
 
     @Override
@@ -170,7 +175,7 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
         try {
             // Use BRPOP for blocking pop with the configured timeout.
             // If data arrives, it returns immediately; if it times out, it 
returns null.
-            KeyValue<String, T> keyData = commands.brpop(1L, key);
+            KeyValue<String, T> keyData = commands.brpop(waitTimeout, key);
             if (keyData != null) {
                 return keyData.getValue();
             } else {
@@ -179,7 +184,7 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
             }
         } catch (Exception e) {
             log.error("Redis BRPOP failed: {}", e.getMessage());
-            return null;
+            throw new CommonDataQueueUnknownException(e.getMessage(), e);
         }
     }
 
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
new file mode 100644
index 0000000000..4be5aa31f2
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.support.exception;
+
+/**
+ * Common data queue unknown exception
+ */
+public class CommonDataQueueUnknownException extends RuntimeException {
+
+    public CommonDataQueueUnknownException() {
+    }
+
+    public CommonDataQueueUnknownException(String message) {
+        super(message);
+    }
+
+    public CommonDataQueueUnknownException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CommonDataQueueUnknownException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
new file mode 100644
index 0000000000..678b32e175
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Backoff utility class.
+ */
+public final class BackoffUtils {
+
+    /** Private constructor to prevent instantiation */
+    private BackoffUtils() {}
+
+    /**
+     * Sleeps for the next delay specified by the ExponentialBackoff instance.
+     * If the thread is interrupted during sleep, it resets the interrupt 
status
+     * and returns false to indicate that the operation should not continue.
+     *
+     * @param backoff the ExponentialBackoff instance to get the next delay 
from
+     * @return true if the sleep completed without interruption, false 
otherwise
+     */
+    public static boolean shouldContinueAfterBackoff(ExponentialBackoff 
backoff) {
+        if (Thread.currentThread().isInterrupted()) {
+            return false;
+        }
+        try {
+            TimeUnit.MILLISECONDS.sleep(backoff.nextDelay());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+        return true;
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
new file mode 100644
index 0000000000..0cf2af5dbc
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+/**
+ * Exponential backoff utility class.
+ *
+ * <p>Provides exponentially increasing delays starting from an initial value,
+ * doubling with each call to {@link #nextDelay()} until reaching the maximum 
value.
+ * Call {@link #reset()} to restart from the initial delay.</p>
+ *
+ * <p>Note: This class is <b>not</b> thread-safe. Each thread should use its 
own instance.</p>
+ */
+public final class ExponentialBackoff {
+    private final long initial;
+    private final long max;
+    private long current;
+
+    public ExponentialBackoff(long initial, long max) {
+        if (initial <= 0 || max < initial) {
+            throw new IllegalArgumentException("Invalid exponential backoff 
params");
+        }
+        this.initial = initial;
+        this.max = max;
+        this.current = initial;
+    }
+
+    /**
+     * Returns the current delay value and advances to the next delay.
+     * The delay doubles on each call until it reaches the maximum value.
+     *
+     * @return the delay in milliseconds to wait before the next retry
+     */
+    public long nextDelay() {
+        long delay = this.current;
+        if (this.current <= this.max / 2) {
+            this.current = this.current * 2;
+        } else {
+            this.current = this.max;
+        }
+        return delay;
+    }
+
+    /**
+     * Resets the backoff to the initial delay value.
+     * Should be called when an operation succeeds after previous failures.
+     */
+    public void reset() {
+        this.current = this.initial;
+    }
+}
diff --git 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
index affb9a37ea..ab02c20b62 100644
--- 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
+++ 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
@@ -79,7 +79,7 @@ class RedisCommonDataQueueTest {
         
when(redisProperties.getLogEntryToStorageQueueName()).thenReturn("logEntryToStorageQueue");
         when(redisProperties.getRedisHost()).thenReturn("localhost");
         when(redisProperties.getRedisPort()).thenReturn(6379);
-
+        when(redisProperties.getWaitTimeout()).thenReturn(1L);
         try (MockedStatic<RedisClient> mockedRedisClient = 
mockStatic(RedisClient.class)) {
             mockedRedisClient.when(() -> 
RedisClient.create(any(RedisURI.class))).thenReturn(redisClient);
             
when(redisClient.connect(any(RedisMetricsDataCodec.class))).thenReturn(connection);
diff --git 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
new file mode 100644
index 0000000000..f97bb8e5ba
--- /dev/null
+++ 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test case for {@link BackoffUtils}
+ */
+class BackoffUtilsTest {
+
+    @Test
+    void shouldContinueAfterBackoff() {
+        ExponentialBackoff backoff = new ExponentialBackoff(10L, 100L);
+        boolean shouldContinue = 
BackoffUtils.shouldContinueAfterBackoff(backoff);
+        assertTrue(shouldContinue);
+    }
+
+    @Test
+    void shouldNotContinueWhenInterrupted() {
+        Thread.currentThread().interrupt();
+        ExponentialBackoff backoff = new ExponentialBackoff(10L, 100L);
+        boolean shouldContinue = 
BackoffUtils.shouldContinueAfterBackoff(backoff);
+        assertFalse(shouldContinue);
+    }
+
+    @Test
+    void shouldHandleInterruptedException() throws InterruptedException {
+        final Thread mainThread = Thread.currentThread();
+        ExponentialBackoff backoff = new ExponentialBackoff(1000L, 2000L);
+
+        Thread interruptingThread = new Thread(() -> {
+            try {
+                // Give the main thread some time to enter the sleep
+                Thread.sleep(200);
+                mainThread.interrupt();
+            } catch (InterruptedException ignored) {
+            }
+        });
+
+        interruptingThread.start();
+
+        boolean shouldContinue = 
BackoffUtils.shouldContinueAfterBackoff(backoff);
+
+        interruptingThread.join();
+
+        assertFalse(shouldContinue);
+        assertTrue(Thread.currentThread().isInterrupted());
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
new file mode 100644
index 0000000000..d82e8fbb91
--- /dev/null
+++ 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test case for {@link ExponentialBackoff}
+ */
+class ExponentialBackoffTest {
+
+    @Test
+    void testProgressionAndCap() {
+        ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
+        Assertions.assertEquals(50L, backoff.nextDelay());
+        Assertions.assertEquals(100L, backoff.nextDelay());
+        Assertions.assertEquals(200L, backoff.nextDelay());
+        Assertions.assertEquals(400L, backoff.nextDelay());
+        Assertions.assertEquals(800L, backoff.nextDelay());
+        Assertions.assertEquals(1000L, backoff.nextDelay());
+        Assertions.assertEquals(1000L, backoff.nextDelay());
+    }
+
+    @Test
+    void testReset() {
+        ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
+        Assertions.assertEquals(50L, backoff.nextDelay());
+        Assertions.assertEquals(100L, backoff.nextDelay());
+        backoff.reset();
+        Assertions.assertEquals(50L, backoff.nextDelay());
+    }
+
+    @Test
+    void testInvalidParams() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ExponentialBackoff(0L, 1000L));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ExponentialBackoff(50L, 10L));
+    }
+}
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
index 1ab255d72f..10aa00021b 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
@@ -18,13 +18,6 @@
 package org.apache.hertzbeat.manager.component.sd;
 
 import com.google.common.collect.Maps;
-import java.time.LocalDateTime;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
@@ -34,6 +27,9 @@ import org.apache.hertzbeat.common.entity.manager.MonitorBind;
 import org.apache.hertzbeat.common.entity.manager.Param;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
 import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao;
 import org.apache.hertzbeat.manager.dao.MonitorBindDao;
 import org.apache.hertzbeat.manager.dao.MonitorDao;
@@ -43,6 +39,14 @@ import org.apache.hertzbeat.manager.service.MonitorService;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Component;
 
+import java.time.LocalDateTime;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Service Discovery Worker
  */
@@ -80,11 +84,13 @@ public class ServiceDiscoveryWorker implements 
InitializingBean {
     private class SdUpdateTask implements Runnable {
         @Override
         public void run() {
+            ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
             while (!Thread.currentThread().isInterrupted()) {
                 try (final CollectRep.MetricsData metricsData = 
dataQueue.pollServiceDiscoveryData()) {
                     if (metricsData == null) {
                         continue;
                     }
+                    backoff.reset();
                     Long monitorId = metricsData.getId();
                     final Monitor mainMonitor = 
monitorDao.findById(monitorId).orElse(null);
                     if (mainMonitor == null) {
@@ -156,6 +162,10 @@ public class ServiceDiscoveryWorker implements 
InitializingBean {
                     final Set<Long> needCancelMonitorIdSet = 
subMonitorBindMap.values().stream()
                             
.map(MonitorBind::getMonitorId).collect(Collectors.toSet());
                     monitorService.deleteMonitors(needCancelMonitorIdSet);
+                } catch (CommonDataQueueUnknownException ue) {
+                    if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+                        break;
+                    }
                 } catch (Exception exception) {
                     log.error(exception.getMessage(), exception);
                 }
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index 802711065e..f0081e47ab 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -17,7 +17,6 @@
 
 package org.apache.hertzbeat.warehouse.store;
 
-import java.util.Optional;
 import jakarta.persistence.EntityManager;
 import jakarta.persistence.PersistenceContext;
 import lombok.extern.slf4j.Slf4j;
@@ -26,6 +25,9 @@ import org.apache.hertzbeat.common.entity.log.LogEntry;
 import org.apache.hertzbeat.common.entity.manager.Monitor;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import 
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
 import org.apache.hertzbeat.plugin.PostCollectPlugin;
 import org.apache.hertzbeat.plugin.runner.PluginRunner;
 import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
@@ -34,6 +36,8 @@ import 
org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
 
+import java.util.Optional;
+
 /**
  * dispatch storage metrics data
  */
@@ -69,12 +73,14 @@ public class DataStorageDispatch {
     protected void startPersistentDataStorage() {
         Runnable runnable = () -> {
             
Thread.currentThread().setName("warehouse-persistent-data-storage");
+            ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
             while (!Thread.currentThread().isInterrupted()) {
                 try {
                     CollectRep.MetricsData metricsData = 
commonDataQueue.pollMetricsDataToStorage();
                     if (metricsData == null) {
                         continue;
                     }
+                    backoff.reset();
                     try {
                         calculateMonitorStatus(metricsData);
                         historyDataWriter.ifPresent(dataWriter -> 
dataWriter.saveData(metricsData));
@@ -84,6 +90,10 @@ public class DataStorageDispatch {
                     }
                 } catch (InterruptedException interruptedException) {
                     Thread.currentThread().interrupt();
+                } catch (CommonDataQueueUnknownException ue) {
+                    if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+                        break;
+                    }
                 } catch (Exception e) {
                     log.error(e.getMessage(), e);
                 }
@@ -94,6 +104,7 @@ public class DataStorageDispatch {
 
     protected void startLogDataStorage() {
         Runnable runnable = () -> {
+            ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
             Thread.currentThread().setName("warehouse-log-data-storage");
             while (!Thread.currentThread().isInterrupted()) {
                 try {
@@ -101,6 +112,7 @@ public class DataStorageDispatch {
                     if (logEntry == null) {
                         continue;
                     }
+                    backoff.reset();
                     historyDataWriter.ifPresent(dataWriter -> {
                         try {
                             dataWriter.saveLogData(logEntry);
@@ -110,6 +122,10 @@ public class DataStorageDispatch {
                     });
                 } catch (InterruptedException interruptedException) {
                     Thread.currentThread().interrupt();
+                } catch (CommonDataQueueUnknownException ue) {
+                    if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+                        break;
+                    }
                 } catch (Exception e) {
                     log.error("Error in log data storage thread: {}", 
e.getMessage(), e);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to