This is an automated email from the ASF dual-hosted git repository. gongchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push: new 810d54bef [improve] Optimize the scheduling logic for batch flush tasks (#3660) 810d54bef is described below commit 810d54bef561c4e8720622691383675f94df9092 Author: Cyanty <153884653+cya...@users.noreply.github.com> AuthorDate: Sun Aug 17 14:24:48 2025 +0800 [improve] Optimize the scheduling logic for batch flush tasks (#3660) Signed-off-by: Cyanty <153884653+cya...@users.noreply.github.com> Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> Co-authored-by: Sherlock Yin <sherlock.yin1...@gmail.com> Co-authored-by: Calvin <zhengqi...@apache.org> --- .../tsdb/vm/VictoriaMetricsDataStorage.java | 56 +++++++--- .../tsdb/vm/VictoriaMetricsDataStorageTest.java | 116 +++++++++++++-------- 2 files changed, 112 insertions(+), 60 deletions(-) diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java index 1736b7f2f..ab304497b 100644 --- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java +++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.GZIPOutputStream; import com.google.common.collect.Maps; @@ -106,8 +107,8 @@ public class VictoriaMetricsDataStorage extends AbstractHistoryDataStorage { private final BlockingQueue<VictoriaMetricsDataStorage.VictoriaMetricsContent> metricsBufferQueue; private HashedWheelTimer metricsFlushTimer = null; - private MetricsFlushTask metricsFlushtask = null; private final VictoriaMetricsProperties.InsertConfig insertConfig; + private final AtomicBoolean draining = new AtomicBoolean(false); public VictoriaMetricsDataStorage(VictoriaMetricsProperties victoriaMetricsProperties, RestTemplate restTemplate) { if (victoriaMetricsProperties == null) { @@ -129,8 +130,8 @@ public class VictoriaMetricsDataStorage extends AbstractHistoryDataStorage { thread.setDaemon(true); return thread; }, 1, TimeUnit.SECONDS, 512); - metricsFlushtask = new MetricsFlushTask(); - this.metricsFlushTimer.newTimeout(metricsFlushtask, 0, TimeUnit.SECONDS); + // start flush interval timer + this.metricsFlushTimer.newTimeout(new MetricsFlushTask(null), insertConfig.flushInterval(), TimeUnit.SECONDS); } private boolean checkVictoriaMetricsDatasourceAvailable() { @@ -591,36 +592,63 @@ public class VictoriaMetricsDataStorage extends AbstractHistoryDataStorage { } } // Refresh in advance to avoid waiting - if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8) { + if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8 + && draining.compareAndSet(false, true)) { triggerImmediateFlush(); } } private void triggerImmediateFlush() { - metricsFlushTimer.newTimeout(metricsFlushtask, 0, TimeUnit.MILLISECONDS); + List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch = new ArrayList<>(insertConfig.bufferSize()); + metricsBufferQueue.drainTo(batch, insertConfig.bufferSize()); + draining.set(false); + if (!batch.isEmpty()) { + metricsFlushTimer.newTimeout(new MetricsFlushTask(batch), 0, TimeUnit.MILLISECONDS); + } } /** * Regularly refresh the buffer queue to the vm */ private class MetricsFlushTask implements TimerTask { + private final List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch; + + public MetricsFlushTask(List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch) { + this.batch = batch; + } + @Override public void run(Timeout timeout) { try { - List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batch = new ArrayList<>(insertConfig.bufferSize()); - metricsBufferQueue.drainTo(batch, insertConfig.bufferSize()); - if (!batch.isEmpty()) { - doSaveData(batch); - log.debug("[Victoria Metrics] Flushed {} metrics items", batch.size()); - } - if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) { - metricsFlushTimer.newTimeout(this, insertConfig.flushInterval(), TimeUnit.SECONDS); - log.debug("[Victoria Metrics] Rescheduled next flush task in {} seconds.", insertConfig.flushInterval()); + if (batch == null) { + // If the batch is null, it means that the timer is triggered by flush interval timer + List<VictoriaMetricsDataStorage.VictoriaMetricsContent> batchT = new ArrayList<>(insertConfig.bufferSize()); + metricsBufferQueue.drainTo(batchT, insertConfig.bufferSize()); + triggerDoSaveData(batchT); + // Reschedule the next flush task + triggerIntervalFlushTimer(); + } else { + // If the batch is not null, it means that the timer is triggered by the immediate flush + triggerDoSaveData(batch); } } catch (Exception e) { log.error("[VictoriaMetrics] flush task error: {}", e.getMessage(), e); } } + + private void triggerDoSaveData(List<VictoriaMetricsContent> batch) { + if (!batch.isEmpty()) { + doSaveData(batch); + log.debug("[Victoria Metrics] Flushed {} metrics items", batch.size()); + } + } + + private void triggerIntervalFlushTimer() { + if (metricsFlushTimer != null && !metricsFlushTimer.isStop()) { + metricsFlushTimer.newTimeout(new MetricsFlushTask(null), insertConfig.flushInterval(), TimeUnit.SECONDS); + log.debug("[Victoria Metrics] Rescheduled next flush task in {} seconds.", insertConfig.flushInterval()); + } + } } /** diff --git a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java index 37957499e..f11fdb905 100644 --- a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java +++ b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorageTest.java @@ -21,9 +21,8 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.startsWith; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; +import static org.assertj.core.api.Assertions.assertThat; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -53,6 +52,7 @@ import org.springframework.web.client.RestTemplate; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Test case for {@link VictoriaMetricsDataStorage} @@ -72,18 +72,33 @@ class VictoriaMetricsDataStorageTest { private VictoriaMetricsDataStorage victoriaMetricsDataStorage; + private final AtomicInteger postForEntityCount = new AtomicInteger(0); + @BeforeEach void setUp() { when(victoriaMetricsProperties.enabled()).thenReturn(true); when(victoriaMetricsProperties.url()).thenReturn("http://localhost:8428"); when(victoriaMetricsProperties.username()).thenReturn("root"); when(victoriaMetricsProperties.password()).thenReturn("root"); + // on successful write, VictoriaMetrics returns HTTP 204 (No Content) when(responseEntity.getStatusCode()).thenReturn(HttpStatus.NO_CONTENT); - when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), any(HttpEntity.class), eq(String.class))) - .thenReturn(responseEntity); - when(restTemplate.postForEntity(anyString(), any(HttpEntity.class), eq(String.class))) - .thenReturn(responseEntity); + + when(restTemplate.exchange( + anyString(), + eq(HttpMethod.GET), + any(HttpEntity.class), + eq(String.class) + )).thenReturn(responseEntity); + + when(restTemplate.postForEntity( + startsWith(victoriaMetricsProperties.url()), + any(HttpEntity.class), + eq(String.class) + )).thenAnswer(invocation -> { + postForEntityCount.incrementAndGet(); + return responseEntity; + }); } @Test @@ -92,14 +107,11 @@ class VictoriaMetricsDataStorageTest { victoriaMetricsDataStorage = new VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate); // execute one-time data insertion victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); - // wait for the timer's first insertion task execution and verify if it was called once - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> - verify(restTemplate, times(1)).postForEntity( - startsWith(victoriaMetricsProperties.url()), - any(HttpEntity.class), - eq(String.class) - ) - ); + // wait for the timer's first insertion task execution and verify if it was called once (default 3 seconds) + Awaitility.await() + .pollInterval(2, TimeUnit.SECONDS) + .atMost(7, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(postForEntityCount.get()).isEqualTo(1)); } @Test @@ -109,28 +121,15 @@ class VictoriaMetricsDataStorageTest { 10, Integer.MAX_VALUE, new VictoriaMetricsProperties.Compression(false))); victoriaMetricsDataStorage = new VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate); - victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); - // wait for the timer to execute its first insertion task - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> - verify(restTemplate, times(1)).postForEntity( - startsWith(victoriaMetricsProperties.url()), - any(HttpEntity.class), - eq(String.class) - ) - ); - // triggers the buffer size insertion condition for (int i = 0; i < 10 * 0.8; i++) { victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); } - // wait for the timer to execute the task again - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> - verify(restTemplate, times(2)).postForEntity( - startsWith(victoriaMetricsProperties.url()), - any(HttpEntity.class), - eq(String.class) - ) - ); + // wait for the timer to execute the task + Awaitility.await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(postForEntityCount.get()).isEqualTo(1)); } @Test @@ -142,22 +141,47 @@ class VictoriaMetricsDataStorageTest { victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); // wait for the timer to execute its first insertion task - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> - verify(restTemplate, times(1)).postForEntity( - startsWith(victoriaMetricsProperties.url()), - any(HttpEntity.class), - eq(String.class) - ) - ); + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(postForEntityCount.get()).isEqualTo(1)); victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); - // wait for the flush interval to be triggered - Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> - verify(restTemplate, times(2)).postForEntity( - startsWith(victoriaMetricsProperties.url()), - any(), - eq(String.class) - )); + // wait for the flush interval to be triggered again + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(postForEntityCount.get()).isEqualTo(2)); + } + + @Test + void testMultiThreadSaveDataBySize() { + int threadCount = 100; + int bufferSize = 10; + int writeSize = (int) (bufferSize * 0.8); + + // verify insert process for buffer size, with the flush interval defined as an unreachable state + when(victoriaMetricsProperties.insert()).thenReturn(new VictoriaMetricsProperties.InsertConfig( + bufferSize, Integer.MAX_VALUE, new VictoriaMetricsProperties.Compression(false))); + victoriaMetricsDataStorage = new VictoriaMetricsDataStorage(victoriaMetricsProperties, restTemplate); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + // triggers the buffer size insertion condition + for (int j = 0; j < writeSize; j++) { + victoriaMetricsDataStorage.saveData(generateMockedMetricsData()); + } + }).start(); + } + + // wait for the timer to execute the task + Awaitility.await() + .pollInterval(3, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) + .untilAsserted(() -> + assertThat(postForEntityCount.get()) + // minimum flushes: ensure all data is processed (threadCount * writeSize / bufferSize) + .isGreaterThanOrEqualTo(threadCount * writeSize / bufferSize)); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org For additional commands, e-mail: notifications-h...@hertzbeat.apache.org