This is an automated email from the ASF dual-hosted git repository.
cyyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0fecaa62 [improve] Added simple exponential backoff strategy (#3860)
2c0fecaa62 is described below
commit 2c0fecaa6262606fe00c7a9ee8bde96d23e03e76
Author: Duansg <[email protected]>
AuthorDate: Fri Nov 21 16:28:21 2025 +0800
[improve] Added simple exponential backoff strategy (#3860)
Signed-off-by: Duansg <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Yang Chen <[email protected]>
---
.../realtime/MetricsRealTimeAlertCalculator.java | 28 ++++++---
.../WindowedLogRealTimeAlertCalculator.java | 19 ++++--
.../hertzbeat/common/config/CommonProperties.java | 5 ++
.../common/queue/impl/KafkaCommonDataQueue.java | 2 +
.../common/queue/impl/RedisCommonDataQueue.java | 9 ++-
.../exception/CommonDataQueueUnknownException.java | 39 +++++++++++++
.../apache/hertzbeat/common/util/BackoffUtils.java | 50 ++++++++++++++++
.../hertzbeat/common/util/ExponentialBackoff.java | 66 +++++++++++++++++++++
.../queue/impl/RedisCommonDataQueueTest.java | 2 +-
.../hertzbeat/common/util/BackoffUtilsTest.java | 68 ++++++++++++++++++++++
.../common/util/ExponentialBackoffTest.java | 54 +++++++++++++++++
.../component/sd/ServiceDiscoveryWorker.java | 24 +++++---
.../warehouse/store/DataStorageDispatch.java | 18 +++++-
13 files changed, 360 insertions(+), 24 deletions(-)
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
index 5fe19edd04..4597760502 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
@@ -17,15 +17,6 @@
package org.apache.hertzbeat.alert.calculate.realtime;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
@@ -41,11 +32,24 @@ import
org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
import org.apache.hertzbeat.common.util.CommonUtil;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
/**
* Calculate alarms based on the alarm definition rules and collected data
*/
@@ -122,16 +126,22 @@ public class MetricsRealTimeAlertCalculator {
*/
public void startCalculate() {
Runnable runnable = () -> {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData =
dataQueue.pollMetricsDataToAlerter();
if (metricsData == null) {
continue;
}
+ backoff.reset();
calculate(metricsData);
dataQueue.sendMetricsDataToStorage(metricsData);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
+ } catch (CommonDataQueueUnknownException ue) {
+ if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+ break;
+ }
} catch (Exception e) {
log.error("calculate alarm error: {}.", e.getMessage(), e);
}
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
index 8d93007838..90d12be6c8 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/WindowedLogRealTimeAlertCalculator.java
@@ -17,13 +17,17 @@
package org.apache.hertzbeat.alert.calculate.realtime;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.calculate.realtime.window.LogWorker;
import org.apache.hertzbeat.alert.calculate.realtime.window.TimeService;
import org.apache.hertzbeat.common.entity.log.LogEntry;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
import org.springframework.stereotype.Component;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.LinkedBlockingQueue;
@@ -58,16 +62,23 @@ public class WindowedLogRealTimeAlertCalculator implements
Runnable {
@Override
public void run() {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
while (!Thread.currentThread().isInterrupted()) {
try {
LogEntry logEntry = dataQueue.pollLogEntry();
- if (logEntry != null) {
- processLogEntry(logEntry);
- dataQueue.sendLogEntryToStorage(logEntry);
+ if (logEntry == null) {
+ continue;
}
+ backoff.reset();
+ processLogEntry(logEntry);
+ dataQueue.sendLogEntryToStorage(logEntry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
+ } catch (CommonDataQueueUnknownException ue) {
+ if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+ break;
+ }
} catch (Exception e) {
log.error("Error in log dispatch loop: {}", e.getMessage(), e);
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
index a70776f5fa..b88cc9c628 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/config/CommonProperties.java
@@ -125,6 +125,11 @@ public class CommonProperties {
*/
private String logEntryToStorageQueueName;
+ /**
+ * Timeout for blocking wait in seconds (defaults to 1 second if not
configured)
+ */
+ private Long waitTimeout;
+
}
/**
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
index 96d6b42eb9..a9286de13e 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/KafkaCommonDataQueue.java
@@ -33,6 +33,7 @@ import
org.apache.hertzbeat.common.serialize.KafkaLogEntryDeserializer;
import org.apache.hertzbeat.common.serialize.KafkaLogEntrySerializer;
import org.apache.hertzbeat.common.serialize.KafkaMetricsDataDeserializer;
import org.apache.hertzbeat.common.serialize.KafkaMetricsDataSerializer;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -176,6 +177,7 @@ public class KafkaCommonDataQueue implements
CommonDataQueue, DisposableBean {
dataConsumer.commitAsync();
} catch (Exception e) {
log.error(e.getMessage());
+ throw new CommonDataQueueUnknownException(e.getMessage(), e);
} finally {
lock.unlock();
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
index ba69b25242..7f82f6372b 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
@@ -30,10 +30,13 @@ import
org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.apache.hertzbeat.common.serialize.RedisLogEntryCodec;
import org.apache.hertzbeat.common.serialize.RedisMetricsDataCodec;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
+import java.util.Objects;
+
/**
* common data queue implement redis.
*/
@@ -57,6 +60,7 @@ public class RedisCommonDataQueue implements CommonDataQueue,
DisposableBean {
private final String logEntryQueueName;
private final String logEntryToStorageQueueName;
private final CommonProperties.RedisProperties redisProperties;
+ private final Long waitTimeout;
public RedisCommonDataQueue(CommonProperties properties) {
@@ -84,6 +88,7 @@ public class RedisCommonDataQueue implements CommonDataQueue,
DisposableBean {
this.metricsDataQueueNameToAlerter =
redisProperties.getMetricsDataQueueNameToAlerter();
this.logEntryQueueName = redisProperties.getLogEntryQueueName();
this.logEntryToStorageQueueName =
redisProperties.getLogEntryToStorageQueueName();
+ this.waitTimeout =
Objects.requireNonNullElse(redisProperties.getWaitTimeout(), 1L);
}
@Override
@@ -170,7 +175,7 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
try {
// Use BRPOP for blocking pop with the configured timeout.
// If data arrives, it returns immediately; if it times out, it
returns null.
- KeyValue<String, T> keyData = commands.brpop(1L, key);
+ KeyValue<String, T> keyData = commands.brpop(waitTimeout, key);
if (keyData != null) {
return keyData.getValue();
} else {
@@ -179,7 +184,7 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
}
} catch (Exception e) {
log.error("Redis BRPOP failed: {}", e.getMessage());
- return null;
+ throw new CommonDataQueueUnknownException(e.getMessage(), e);
}
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
new file mode 100644
index 0000000000..4be5aa31f2
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/exception/CommonDataQueueUnknownException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.support.exception;
+
+/**
+ * Common data queue unknown exception
+ */
+public class CommonDataQueueUnknownException extends RuntimeException {
+
+ public CommonDataQueueUnknownException() {
+ }
+
+ public CommonDataQueueUnknownException(String message) {
+ super(message);
+ }
+
+ public CommonDataQueueUnknownException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CommonDataQueueUnknownException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
new file mode 100644
index 0000000000..678b32e175
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/BackoffUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Backoff utility class.
+ */
+public final class BackoffUtils {
+
+ /** Private constructor to prevent instantiation */
+ private BackoffUtils() {}
+
+ /**
+ * Sleeps for the next delay specified by the ExponentialBackoff instance.
+ * If the thread is interrupted during sleep, it resets the interrupt
status
+ * and returns false to indicate that the operation should not continue.
+ *
+ * @param backoff the ExponentialBackoff instance to get the next delay
from
+ * @return true if the sleep completed without interruption, false
otherwise
+ */
+ public static boolean shouldContinueAfterBackoff(ExponentialBackoff
backoff) {
+ if (Thread.currentThread().isInterrupted()) {
+ return false;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(backoff.nextDelay());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
new file mode 100644
index 0000000000..0cf2af5dbc
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ExponentialBackoff.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+/**
+ * Exponential backoff utility class.
+ *
+ * <p>Provides exponentially increasing delays starting from an initial value,
+ * doubling with each call to {@link #nextDelay()} until reaching the maximum
value.
+ * Call {@link #reset()} to restart from the initial delay.</p>
+ *
+ * <p>Note: This class is <b>not</b> thread-safe. Each thread should use its
own instance.</p>
+ */
+public final class ExponentialBackoff {
+ private final long initial;
+ private final long max;
+ private long current;
+
+ public ExponentialBackoff(long initial, long max) {
+ if (initial <= 0 || max < initial) {
+ throw new IllegalArgumentException("Invalid exponential backoff
params");
+ }
+ this.initial = initial;
+ this.max = max;
+ this.current = initial;
+ }
+
+ /**
+ * Returns the current delay value and advances to the next delay.
+ * The delay doubles on each call until it reaches the maximum value.
+ *
+ * @return the delay in milliseconds to wait before the next retry
+ */
+ public long nextDelay() {
+ long delay = this.current;
+ if (this.current <= this.max / 2) {
+ this.current = this.current * 2;
+ } else {
+ this.current = this.max;
+ }
+ return delay;
+ }
+
+ /**
+ * Resets the backoff to the initial delay value.
+ * Should be called when an operation succeeds after previous failures.
+ */
+ public void reset() {
+ this.current = this.initial;
+ }
+}
diff --git
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
index affb9a37ea..ab02c20b62 100644
---
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
+++
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
@@ -79,7 +79,7 @@ class RedisCommonDataQueueTest {
when(redisProperties.getLogEntryToStorageQueueName()).thenReturn("logEntryToStorageQueue");
when(redisProperties.getRedisHost()).thenReturn("localhost");
when(redisProperties.getRedisPort()).thenReturn(6379);
-
+ when(redisProperties.getWaitTimeout()).thenReturn(1L);
try (MockedStatic<RedisClient> mockedRedisClient =
mockStatic(RedisClient.class)) {
mockedRedisClient.when(() ->
RedisClient.create(any(RedisURI.class))).thenReturn(redisClient);
when(redisClient.connect(any(RedisMetricsDataCodec.class))).thenReturn(connection);
diff --git
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
new file mode 100644
index 0000000000..f97bb8e5ba
--- /dev/null
+++
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/BackoffUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test case for {@link BackoffUtils}
+ */
+class BackoffUtilsTest {
+
+ @Test
+ void shouldContinueAfterBackoff() {
+ ExponentialBackoff backoff = new ExponentialBackoff(10L, 100L);
+ boolean shouldContinue =
BackoffUtils.shouldContinueAfterBackoff(backoff);
+ assertTrue(shouldContinue);
+ }
+
+ @Test
+ void shouldNotContinueWhenInterrupted() {
+ Thread.currentThread().interrupt();
+ ExponentialBackoff backoff = new ExponentialBackoff(10L, 100L);
+ boolean shouldContinue =
BackoffUtils.shouldContinueAfterBackoff(backoff);
+ assertFalse(shouldContinue);
+ }
+
+ @Test
+ void shouldHandleInterruptedException() throws InterruptedException {
+ final Thread mainThread = Thread.currentThread();
+ ExponentialBackoff backoff = new ExponentialBackoff(1000L, 2000L);
+
+ Thread interruptingThread = new Thread(() -> {
+ try {
+ // Give the main thread some time to enter the sleep
+ Thread.sleep(200);
+ mainThread.interrupt();
+ } catch (InterruptedException ignored) {
+ }
+ });
+
+ interruptingThread.start();
+
+ boolean shouldContinue =
BackoffUtils.shouldContinueAfterBackoff(backoff);
+
+ interruptingThread.join();
+
+ assertFalse(shouldContinue);
+ assertTrue(Thread.currentThread().isInterrupted());
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
new file mode 100644
index 0000000000..d82e8fbb91
--- /dev/null
+++
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ExponentialBackoffTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test case for {@link ExponentialBackoff}
+ */
+class ExponentialBackoffTest {
+
+ @Test
+ void testProgressionAndCap() {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
+ Assertions.assertEquals(50L, backoff.nextDelay());
+ Assertions.assertEquals(100L, backoff.nextDelay());
+ Assertions.assertEquals(200L, backoff.nextDelay());
+ Assertions.assertEquals(400L, backoff.nextDelay());
+ Assertions.assertEquals(800L, backoff.nextDelay());
+ Assertions.assertEquals(1000L, backoff.nextDelay());
+ Assertions.assertEquals(1000L, backoff.nextDelay());
+ }
+
+ @Test
+ void testReset() {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
+ Assertions.assertEquals(50L, backoff.nextDelay());
+ Assertions.assertEquals(100L, backoff.nextDelay());
+ backoff.reset();
+ Assertions.assertEquals(50L, backoff.nextDelay());
+ }
+
+ @Test
+ void testInvalidParams() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
ExponentialBackoff(0L, 1000L));
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
ExponentialBackoff(50L, 10L));
+ }
+}
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
index 1ab255d72f..10aa00021b 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
@@ -18,13 +18,6 @@
package org.apache.hertzbeat.manager.component.sd;
import com.google.common.collect.Maps;
-import java.time.LocalDateTime;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
@@ -34,6 +27,9 @@ import org.apache.hertzbeat.common.entity.manager.MonitorBind;
import org.apache.hertzbeat.common.entity.manager.Param;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao;
import org.apache.hertzbeat.manager.dao.MonitorBindDao;
import org.apache.hertzbeat.manager.dao.MonitorDao;
@@ -43,6 +39,14 @@ import org.apache.hertzbeat.manager.service.MonitorService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
+import java.time.LocalDateTime;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Service Discovery Worker
*/
@@ -80,11 +84,13 @@ public class ServiceDiscoveryWorker implements
InitializingBean {
private class SdUpdateTask implements Runnable {
@Override
public void run() {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
while (!Thread.currentThread().isInterrupted()) {
try (final CollectRep.MetricsData metricsData =
dataQueue.pollServiceDiscoveryData()) {
if (metricsData == null) {
continue;
}
+ backoff.reset();
Long monitorId = metricsData.getId();
final Monitor mainMonitor =
monitorDao.findById(monitorId).orElse(null);
if (mainMonitor == null) {
@@ -156,6 +162,10 @@ public class ServiceDiscoveryWorker implements
InitializingBean {
final Set<Long> needCancelMonitorIdSet =
subMonitorBindMap.values().stream()
.map(MonitorBind::getMonitorId).collect(Collectors.toSet());
monitorService.deleteMonitors(needCancelMonitorIdSet);
+ } catch (CommonDataQueueUnknownException ue) {
+ if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+ break;
+ }
} catch (Exception exception) {
log.error(exception.getMessage(), exception);
}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
index 802711065e..f0081e47ab 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java
@@ -17,7 +17,6 @@
package org.apache.hertzbeat.warehouse.store;
-import java.util.Optional;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +25,9 @@ import org.apache.hertzbeat.common.entity.log.LogEntry;
import org.apache.hertzbeat.common.entity.manager.Monitor;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
+import
org.apache.hertzbeat.common.support.exception.CommonDataQueueUnknownException;
+import org.apache.hertzbeat.common.util.BackoffUtils;
+import org.apache.hertzbeat.common.util.ExponentialBackoff;
import org.apache.hertzbeat.plugin.PostCollectPlugin;
import org.apache.hertzbeat.plugin.runner.PluginRunner;
import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
@@ -34,6 +36,8 @@ import
org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
+import java.util.Optional;
+
/**
* dispatch storage metrics data
*/
@@ -69,12 +73,14 @@ public class DataStorageDispatch {
protected void startPersistentDataStorage() {
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-persistent-data-storage");
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData =
commonDataQueue.pollMetricsDataToStorage();
if (metricsData == null) {
continue;
}
+ backoff.reset();
try {
calculateMonitorStatus(metricsData);
historyDataWriter.ifPresent(dataWriter ->
dataWriter.saveData(metricsData));
@@ -84,6 +90,10 @@ public class DataStorageDispatch {
}
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
+ } catch (CommonDataQueueUnknownException ue) {
+ if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+ break;
+ }
} catch (Exception e) {
log.error(e.getMessage(), e);
}
@@ -94,6 +104,7 @@ public class DataStorageDispatch {
protected void startLogDataStorage() {
Runnable runnable = () -> {
+ ExponentialBackoff backoff = new ExponentialBackoff(50L, 1000L);
Thread.currentThread().setName("warehouse-log-data-storage");
while (!Thread.currentThread().isInterrupted()) {
try {
@@ -101,6 +112,7 @@ public class DataStorageDispatch {
if (logEntry == null) {
continue;
}
+ backoff.reset();
historyDataWriter.ifPresent(dataWriter -> {
try {
dataWriter.saveLogData(logEntry);
@@ -110,6 +122,10 @@ public class DataStorageDispatch {
});
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
+ } catch (CommonDataQueueUnknownException ue) {
+ if (!BackoffUtils.shouldContinueAfterBackoff(backoff)) {
+ break;
+ }
} catch (Exception e) {
log.error("Error in log data storage thread: {}",
e.getMessage(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]