This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 810d54bef [improve] Optimize the scheduling logic for batch flush 
tasks (#3660)
810d54bef is described below

commit 810d54bef561c4e8720622691383675f94df9092
Author: Cyanty <153884653+cya...@users.noreply.github.com>
AuthorDate: Sun Aug 17 14:24:48 2025 +0800

    [improve] Optimize the scheduling logic for batch flush tasks (#3660)
    
    Signed-off-by: Cyanty <153884653+cya...@users.noreply.github.com>
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    Co-authored-by: Sherlock Yin <sherlock.yin1...@gmail.com>
    Co-authored-by: Calvin <zhengqi...@apache.org>
---
 .../tsdb/vm/VictoriaMetricsDataStorage.java        |  56 +++++++---
 .../tsdb/vm/VictoriaMetricsDataStorageTest.java    | 116 +++++++++++++--------
 2 files changed, 112 insertions(+), 60 deletions(-)

diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
index 1736b7f2f..ab304497b 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.GZIPOutputStream;
 
 import com.google.common.collect.Maps;
@@ -106,8 +107,8 @@ public class VictoriaMetricsDataStorage extends 
AbstractHistoryDataStorage {
     private final 
BlockingQueue<VictoriaMetricsDataStorage.VictoriaMetricsContent> 
metricsBufferQueue;
     
     private HashedWheelTimer metricsFlushTimer = null;
-    private MetricsFlushTask metricsFlushtask = null;
     private final VictoriaMetricsProperties.InsertConfig insertConfig;
+    private final AtomicBoolean draining = new AtomicBoolean(false);
 
     public VictoriaMetricsDataStorage(VictoriaMetricsProperties 
victoriaMetricsProperties, RestTemplate restTemplate) {
         if (victoriaMetricsProperties == null) {
@@ -129,8 +130,8 @@ public class VictoriaMetricsDataStorage extends 
AbstractHistoryDataStorage {
             thread.setDaemon(true);
             return thread;
         }, 1, TimeUnit.SECONDS, 512);
-        metricsFlushtask = new MetricsFlushTask();
-        this.metricsFlushTimer.newTimeout(metricsFlushtask, 0, 
TimeUnit.SECONDS);
+        // start flush interval timer
+        this.metricsFlushTimer.newTimeout(new MetricsFlushTask(null), 
insertConfig.flushInterval(), TimeUnit.SECONDS);
     }
 
     private boolean checkVictoriaMetricsDatasourceAvailable() {
@@ -591,36 +592,63 @@ public class VictoriaMetricsDataStorage extends 
AbstractHistoryDataStorage {
             }
         }
         // Refresh in advance to avoid waiting
-        if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8) {
+        if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8
+                && draining.compareAndSet(false, true)) {
             triggerImmediateFlush();
         }
     }
 
     private void triggerImmediateFlush() {
-        metricsFlushTimer.newTimeout(metricsFlushtask, 0, 
TimeUnit.MILLISECONDS);
+        List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch = new 
ArrayList<>(insertConfig.bufferSize());
+        metricsBufferQueue.drainTo(batch, insertConfig.bufferSize());
+        draining.set(false);
+        if (!batch.isEmpty()) {
+            metricsFlushTimer.newTimeout(new MetricsFlushTask(batch), 0, 
TimeUnit.MILLISECONDS);
+        }
     }
 
     /**
      * Regularly refresh the buffer queue to the vm
      */
     private class MetricsFlushTask implements TimerTask {
+        private final List<VictoriaMetricsDataStorage.VictoriaMetricsContent> 
batch;
+
+        public 
MetricsFlushTask(List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch) 
{
+            this.batch = batch;
+        }
+
         @Override
         public void run(Timeout timeout) {
             try {
-                List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch 
= new ArrayList<>(insertConfig.bufferSize());
-                metricsBufferQueue.drainTo(batch, insertConfig.bufferSize());
-                if (!batch.isEmpty()) {
-                    doSaveData(batch);
-                    log.debug("[Victoria Metrics] Flushed {} metrics items", 
batch.size());
-                }
-                if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) {
-                    metricsFlushTimer.newTimeout(this, 
insertConfig.flushInterval(), TimeUnit.SECONDS);
-                    log.debug("[Victoria Metrics] Rescheduled next flush task 
in {} seconds.", insertConfig.flushInterval());
+                if (batch == null) {
+                    // If the batch is null, it means that the timer is 
triggered by flush interval timer
+                    List<VictoriaMetricsDataStorage.VictoriaMetricsContent> 
batchT = new ArrayList<>(insertConfig.bufferSize());
+                    metricsBufferQueue.drainTo(batchT, 
insertConfig.bufferSize());
+                    triggerDoSaveData(batchT);
+                    // Reschedule the next flush task
+                    triggerIntervalFlushTimer();
+                } else {
+                    // If the batch is not null, it means that the timer is 
triggered by the immediate flush
+                    triggerDoSaveData(batch);
                 }
             } catch (Exception e) {
                 log.error("[VictoriaMetrics] flush task error: {}", 
e.getMessage(), e);
             }
         }
+
+        private void triggerDoSaveData(List<VictoriaMetricsContent> batch) {
+            if (!batch.isEmpty()) {
+                doSaveData(batch);
+                log.debug("[Victoria Metrics] Flushed {} metrics items", 
batch.size());
+            }
+        }
+
+        private void triggerIntervalFlushTimer() {
+            if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) {
+                metricsFlushTimer.newTimeout(new MetricsFlushTask(null), 
insertConfig.flushInterval(), TimeUnit.SECONDS);
+                log.debug("[Victoria Metrics] Rescheduled next flush task in 
{} seconds.", insertConfig.flushInterval());
+            }
+        }
     }
 
     /**
diff --git 
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
 
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
index 37957499e..f11fdb905 100644
--- 
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
+++ 
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java
@@ -21,9 +21,8 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.startsWith;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
@@ -53,6 +52,7 @@ import org.springframework.web.client.RestTemplate;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Test case for {@link VictoriaMetricsDataStorage}
@@ -72,18 +72,33 @@ class VictoriaMetricsDataStorageTest {
 
     private VictoriaMetricsDataStorage victoriaMetricsDataStorage;
 
+    private final AtomicInteger postForEntityCount = new AtomicInteger(0);
+
     @BeforeEach
     void setUp() {
         when(victoriaMetricsProperties.enabled()).thenReturn(true);
         
when(victoriaMetricsProperties.url()).thenReturn("http://localhost:8428";);
         when(victoriaMetricsProperties.username()).thenReturn("root");
         when(victoriaMetricsProperties.password()).thenReturn("root");
+
         // on successful write, VictoriaMetrics returns HTTP 204 (No Content)
         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.NO_CONTENT);
-        when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), 
any(HttpEntity.class), eq(String.class)))
-                .thenReturn(responseEntity);
-        when(restTemplate.postForEntity(anyString(), any(HttpEntity.class), 
eq(String.class)))
-                .thenReturn(responseEntity);
+
+        when(restTemplate.exchange(
+                anyString(),
+                eq(HttpMethod.GET),
+                any(HttpEntity.class),
+                eq(String.class)
+        )).thenReturn(responseEntity);
+
+        when(restTemplate.postForEntity(
+                startsWith(victoriaMetricsProperties.url()),
+                any(HttpEntity.class),
+                eq(String.class)
+        )).thenAnswer(invocation -> {
+            postForEntityCount.incrementAndGet();
+            return responseEntity;
+        });
     }
 
     @Test
@@ -92,14 +107,11 @@ class VictoriaMetricsDataStorageTest {
         victoriaMetricsDataStorage = new 
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
         // execute one-time data insertion
         victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
-        // wait for the timer's first insertion task execution and verify if 
it was called once
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
-                verify(restTemplate, times(1)).postForEntity(
-                        startsWith(victoriaMetricsProperties.url()),
-                        any(HttpEntity.class),
-                        eq(String.class)
-                )
-        );
+        // wait for the timer's first insertion task execution and verify if 
it was called once (default 3 seconds)
+        Awaitility.await()
+                .pollInterval(2, TimeUnit.SECONDS)
+                .atMost(7, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertThat(postForEntityCount.get()).isEqualTo(1));
     }
 
     @Test
@@ -109,28 +121,15 @@ class VictoriaMetricsDataStorageTest {
                 10, Integer.MAX_VALUE, new 
VictoriaMetricsProperties.Compression(false)));
         victoriaMetricsDataStorage = new 
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
 
-        victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
-        // wait for the timer to execute its first insertion task
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
-                verify(restTemplate, times(1)).postForEntity(
-                        startsWith(victoriaMetricsProperties.url()),
-                        any(HttpEntity.class),
-                        eq(String.class)
-                )
-        );
-
         // triggers the buffer size insertion condition
         for (int i = 0; i < 10 * 0.8; i++) {
             victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
         }
-        // wait for the timer to execute the task again
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
-                verify(restTemplate, times(2)).postForEntity(
-                        startsWith(victoriaMetricsProperties.url()),
-                        any(HttpEntity.class),
-                        eq(String.class)
-                )
-        );
+        // wait for the timer to execute the task
+        Awaitility.await()
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertThat(postForEntityCount.get()).isEqualTo(1));
     }
 
     @Test
@@ -142,22 +141,47 @@ class VictoriaMetricsDataStorageTest {
 
         victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
         // wait for the timer to execute its first insertion task
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
-                verify(restTemplate, times(1)).postForEntity(
-                        startsWith(victoriaMetricsProperties.url()),
-                        any(HttpEntity.class),
-                        eq(String.class)
-                )
-        );
+        Awaitility.await()
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertThat(postForEntityCount.get()).isEqualTo(1));
 
         victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
-        // wait for the flush interval to be triggered
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
-                verify(restTemplate, times(2)).postForEntity(
-                        startsWith(victoriaMetricsProperties.url()),
-                        any(),
-                        eq(String.class)
-        ));
+        // wait for the flush interval to be triggered again
+        Awaitility.await()
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertThat(postForEntityCount.get()).isEqualTo(2));
+    }
+
+    @Test
+    void testMultiThreadSaveDataBySize() {
+        int threadCount = 100;
+        int bufferSize = 10;
+        int writeSize = (int) (bufferSize * 0.8);
+
+        // verify insert process for buffer size, with the flush interval 
defined as an unreachable state
+        when(victoriaMetricsProperties.insert()).thenReturn(new 
VictoriaMetricsProperties.InsertConfig(
+                bufferSize, Integer.MAX_VALUE, new 
VictoriaMetricsProperties.Compression(false)));
+        victoriaMetricsDataStorage = new 
VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate);
+
+        for (int i = 0; i < threadCount; i++) {
+            new Thread(() -> {
+                // triggers the buffer size insertion condition
+                for (int j = 0; j < writeSize; j++) {
+                    
victoriaMetricsDataStorage.saveData(generateMockedMetricsData());
+                }
+            }).start();
+        }
+
+        // wait for the timer to execute the task
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() ->
+                        assertThat(postForEntityCount.get())
+                                // minimum flushes: ensure all data is 
processed (threadCount * writeSize / bufferSize)
+                                .isGreaterThanOrEqualTo(threadCount * 
writeSize / bufferSize));
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org
For additional commands, e-mail: notifications-h...@hertzbeat.apache.org

Reply via email to