This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new b3dba8b3cf [feat] Support Apache Doris as time-series storage for both
metrics and logs (#4031)
b3dba8b3cf is described below
commit b3dba8b3cf814740688caf9f8f03e069389c541a
Author: LunaRain_079 <[email protected]>
AuthorDate: Mon Mar 2 23:02:51 2026 +0800
[feat] Support Apache Doris as time-series storage for both metrics and
logs (#4031)
---
.../src/main/resources/application.yml | 32 +
hertzbeat-warehouse/pom.xml | 13 +
.../warehouse/constants/WarehouseConstants.java | 2 +
.../store/history/tsdb/doris/DorisDataStorage.java | 1244 ++++++++++++++++++++
.../store/history/tsdb/doris/DorisMetricRow.java | 46 +
.../store/history/tsdb/doris/DorisProperties.java | 235 ++++
.../history/tsdb/doris/DorisStreamLoadWriter.java | 603 ++++++++++
.../history/tsdb/doris/DorisDataStorageTest.java | 314 +++++
home/docs/start/doris-init.md | 222 ++++
.../current/start/doris-init.md | 221 ++++
10 files changed, 2932 insertions(+)
diff --git a/hertzbeat-startup/src/main/resources/application.yml
b/hertzbeat-startup/src/main/resources/application.yml
index 85ff831d59..17a70b2c08 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -225,6 +225,38 @@ warehouse:
password: root
expire-time: '30d'
replication: 1
+ doris:
+ enabled: false
+ url: jdbc:mysql://127.0.0.1:9030
+ username: root
+ password:
+ table-config:
+ enable-partition: false
+ partition-time-unit: DAY
+ partition-retention-days: 30
+ partition-future-days: 3
+ buckets: 8
+ replication-num: 1
+ pool-config:
+ minimum-idle: 5
+ maximum-pool-size: 20
+ connection-timeout: 30000
+ write-config:
+ # Write mode: jdbc (default, suitable for small/medium scale) or
stream (high throughput)
+ write-mode: jdbc
+ # JDBC mode: batch size and flush interval
+ batch-size: 1000
+ flush-interval: 5
+ # Stream Load mode configuration (only used when write-mode: stream)
+ stream-load-config:
+ # Doris FE HTTP port for Stream Load API
+ http-port: :8030
+ # Stream load timeout in seconds
+ timeout: 60
+ # Max batch size in bytes for stream load (10MB default)
+ max-bytes-per-batch: 10485760
+ # Redirect policy in complex networks: direct/public/private, empty
means Doris default
+ redirect-policy: ""
# store real-time metrics data, enable only one below
real-time:
memory:
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index b2d5e2ac53..e65b30d536 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -119,6 +119,19 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- doris -->
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index 45ad3f038f..db73381be6 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -48,6 +48,8 @@ public interface WarehouseConstants {
String QUEST_DB = "questdb";
String DUCKDB = "duckdb";
+
+ String DORIS = "doris";
}
/**
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
new file mode 100644
index 0000000000..22bcd46827
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
+import
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Apache Doris data storage.
+ * Supports two write modes:
+ * - jdbc: Traditional JDBC batch insert (default, suitable for small to
medium scale)
+ * - stream: HTTP Stream Load API (high throughput, suitable for large scale)
+ */
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled",
havingValue = "true")
+@Slf4j
+public class DorisDataStorage extends AbstractHistoryDataStorage {
+
+ private static final String DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
+ private static final String DATABASE_NAME = "hertzbeat";
+ private static final String TABLE_NAME = "hzb_history";
+ private static final String LOG_TABLE_NAME = "hzb_log";
+ private static final String WRITE_MODE_JDBC = "jdbc";
+ private static final String WRITE_MODE_STREAM = "stream";
+
+ /**
+ * Metric type constants
+ */
+ private static final byte METRIC_TYPE_NUMBER = 1;
+ private static final byte METRIC_TYPE_STRING = 2;
+ private static final byte METRIC_TYPE_TIME = 3;
+
+ private static final String LABEL_KEY_START_TIME = "start";
+ private static final String LABEL_KEY_END_TIME = "end";
+ private static final int MAX_QUERY_LIMIT = 20000;
+ private static final int LOG_BATCH_SIZE = 1000;
+ private static final long NANOS_PER_MILLISECOND = 1_000_000L;
+ private static final long MAX_WAIT_MS = 500L;
+ private static final int MAX_RETRIES = 3;
+ private static final String METRIC_BLOOM_FILTER_COLUMNS =
"instance,app,metrics,metric";
+ private static final String LOG_BLOOM_FILTER_COLUMNS =
"trace_id,span_id,severity_number,severity_text";
+ private static final String CREATE_METRIC_TABLE_HEADER_TEMPLATE = """
+ CREATE TABLE IF NOT EXISTS %s (
+ instance VARCHAR(128) COMMENT 'Monitor
instance address',
+ app VARCHAR(64) COMMENT 'Monitor
application type',
+ metrics VARCHAR(128) COMMENT 'Metrics set
name',
+ metric VARCHAR(128) COMMENT 'Metric name',
+ record_time DATETIME COMMENT 'Record time',
+ metric_type TINYINT COMMENT 'Metric type:
1-number, 2-string, 3-time',
+ int32_value INT COMMENT 'Integer value',
+ double_value DOUBLE COMMENT 'Double value',
+ str_value VARCHAR(65533) COMMENT 'String value',
+ labels VARCHAR(%d) COMMENT 'Labels JSON'
+ ) DUPLICATE KEY(instance, app, metrics, metric, record_time)
+ """;
+ private static final String CREATE_LOG_TABLE_HEADER_TEMPLATE = """
+ CREATE TABLE IF NOT EXISTS %s (
+ time_unix_nano BIGINT COMMENT 'event unix
time in nanoseconds',
+ trace_id VARCHAR(64) COMMENT 'trace id',
+ span_id VARCHAR(32) COMMENT 'span id',
+ event_time DATETIME COMMENT 'event time for
partition and query',
+ observed_time_unix_nano BIGINT COMMENT 'observed unix
time in nanoseconds',
+ severity_number INT COMMENT 'severity
number',
+ severity_text VARCHAR(32) COMMENT 'severity text',
+ body VARCHAR(65533) COMMENT 'log body json',
+ trace_flags INT COMMENT 'trace flags',
+ attributes VARCHAR(65533) COMMENT 'log attributes
json',
+ resource VARCHAR(65533) COMMENT 'resource json',
+ instrumentation_scope VARCHAR(%d) COMMENT 'instrumentation
scope json',
+ dropped_attributes_count INT COMMENT 'dropped
attributes count'
+ ) DUPLICATE KEY(time_unix_nano, trace_id, span_id, event_time)
+ """;
+ private static final String CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE = """
+ PARTITION BY RANGE(%s) ()
+ DISTRIBUTED BY HASH(%s) BUCKETS %d
+ PROPERTIES (
+ "replication_num" = "%d",
+ "bloom_filter_columns" = "%s",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "%s",
+ "dynamic_partition.end" = "%d",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "%d",
+ "dynamic_partition.history_partition_num" = "%d"
+ )
+ """;
+ private static final String CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE =
"""
+ DISTRIBUTED BY HASH(%s) BUCKETS %d
+ PROPERTIES (
+ "replication_num" = "%d",
+ "bloom_filter_columns" = "%s"
+ )
+ """;
+
+ private final DorisProperties properties;
+ private HikariDataSource dataSource;
+
+ private final BlockingQueue<DorisMetricRow> metricsBufferQueue;
+ private final DorisProperties.WriteConfig writeConfig;
+ private final WarehouseWorkerPool warehouseWorkerPool;
+ private String writeMode;
+ private final AtomicBoolean draining = new AtomicBoolean(false);
+ private volatile boolean flushThreadRunning = true;
+ private volatile boolean flushTaskStarted;
+ private final CountDownLatch flushTaskStopped = new CountDownLatch(1);
+
+ // Stream Load writer (only used when writeMode is "stream")
+ private DorisStreamLoadWriter streamLoadWriter;
+ private DorisStreamLoadWriter logStreamLoadWriter;
+
+ public DorisDataStorage(DorisProperties dorisProperties,
WarehouseWorkerPool warehouseWorkerPool) {
+ if (dorisProperties == null) {
+ log.error("[Doris] Init error, please config Warehouse Doris props
in application.yml");
+ throw new IllegalArgumentException("please config Warehouse Doris
props");
+ }
+ if (warehouseWorkerPool == null) {
+ throw new IllegalArgumentException("please config
WarehouseWorkerPool bean");
+ }
+ this.properties = dorisProperties;
+ this.warehouseWorkerPool = warehouseWorkerPool;
+ this.writeConfig = dorisProperties.writeConfig();
+ this.writeMode = writeConfig.writeMode();
+ this.metricsBufferQueue = new
LinkedBlockingQueue<>(writeConfig.batchSize() * 10);
+
+ serverAvailable = initDorisConnection();
+ if (serverAvailable) {
+ // Initialize Stream Load writer if using stream mode
+ if (WRITE_MODE_STREAM.equals(writeMode)) {
+ initStreamLoadWriter();
+ }
+ startFlushThread();
+ }
+
+ log.info("[Doris] Initialized with write mode: {}", writeMode);
+ }
+
+ /**
+ * Initialize Doris connection and table in one go.
+ */
+ private boolean initDorisConnection() {
+ try {
+ Class.forName(DRIVER_NAME);
+
+ DorisProperties.PoolConfig poolConfig = properties.poolConfig();
+ String baseUrl = properties.url();
+
+ // Step 1: First connect without database to create it if needed
+ try (Connection initConn = java.sql.DriverManager.getConnection(
+ baseUrl, properties.username(), properties.password());
+ Statement stmt = initConn.createStatement()) {
+ // Create database if not exists
+ stmt.execute("CREATE DATABASE IF NOT EXISTS " + DATABASE_NAME);
+ log.info("[Doris] Database {} ensured", DATABASE_NAME);
+ }
+
+ // Step 2: Build URL with database
+ String urlWithDb = baseUrl.endsWith("/") ? baseUrl + DATABASE_NAME
: baseUrl + "/" + DATABASE_NAME;
+
+ // Step 3: Create connection pool with database
+ HikariConfig config = getHikariConfig(urlWithDb, poolConfig);
+
+ this.dataSource = new HikariDataSource(config);
+
+ // Step 4: Create table
+ try (Connection conn = dataSource.getConnection();
+ Statement stmt = conn.createStatement()) {
+ DorisProperties.TableConfig tableConfig =
properties.tableConfig();
+
+ String metricTableHeader = CREATE_METRIC_TABLE_HEADER_TEMPLATE
+ .formatted(TABLE_NAME, tableConfig.strColumnMaxLength());
+ String metricTableSuffix = tableConfig.enablePartition()
+ ? CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE.formatted(
+ "record_time",
+ "instance, app, metrics",
+ tableConfig.buckets(),
+ tableConfig.replicationNum(),
+ METRIC_BLOOM_FILTER_COLUMNS,
+ tableConfig.partitionTimeUnit(),
+ tableConfig.partitionFutureDays(),
+ tableConfig.buckets(),
+ tableConfig.partitionRetentionDays())
+ : CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE.formatted(
+ "instance, app, metrics",
+ tableConfig.buckets(),
+ tableConfig.replicationNum(),
+ METRIC_BLOOM_FILTER_COLUMNS);
+
+ String logTableHeader = CREATE_LOG_TABLE_HEADER_TEMPLATE
+ .formatted(LOG_TABLE_NAME,
tableConfig.strColumnMaxLength());
+ String logTableSuffix = tableConfig.enablePartition()
+ ? CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE.formatted(
+ "event_time",
+ "time_unix_nano",
+ tableConfig.buckets(),
+ tableConfig.replicationNum(),
+ LOG_BLOOM_FILTER_COLUMNS,
+ tableConfig.partitionTimeUnit(),
+ tableConfig.partitionFutureDays(),
+ tableConfig.buckets(),
+ tableConfig.partitionRetentionDays())
+ : CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE.formatted(
+ "time_unix_nano",
+ tableConfig.buckets(),
+ tableConfig.replicationNum(),
+ LOG_BLOOM_FILTER_COLUMNS);
+
+ stmt.execute(metricTableHeader + metricTableSuffix);
+ log.info("[Doris] Table {} ensured", TABLE_NAME);
+ stmt.execute(logTableHeader + logTableSuffix);
+ log.info("[Doris] Table {} ensured", LOG_TABLE_NAME);
+ }
+
+ log.info("[Doris] Initialized: {}", urlWithDb);
+ return true;
+ } catch (ClassNotFoundException e) {
+ log.error("[Doris] MySQL JDBC driver not found.", e);
+ } catch (Exception e) {
+ log.error("[Doris] Failed to initialize: {}", e.getMessage(), e);
+ }
+ return false;
+ }
+
+ @NotNull
+ private HikariConfig getHikariConfig(String urlWithDb,
DorisProperties.PoolConfig poolConfig) {
+ HikariConfig config = new HikariConfig();
+ config.setJdbcUrl(urlWithDb);
+ config.setUsername(properties.username());
+ config.setPassword(properties.password());
+ config.setDriverClassName(DRIVER_NAME);
+ config.setMinimumIdle(poolConfig.minimumIdle());
+ config.setMaximumPoolSize(poolConfig.maximumPoolSize());
+ config.setConnectionTimeout(poolConfig.connectionTimeout());
+ config.setMaxLifetime(poolConfig.maxLifetime());
+ config.setIdleTimeout(poolConfig.idleTimeout());
+ config.setConnectionTestQuery("SELECT 1");
+ config.setPoolName("Doris-HikariCP");
+ return config;
+ }
+
+ /**
+ * Initialize Stream Load writer
+ */
+ private void initStreamLoadWriter() {
+ try {
+ this.streamLoadWriter = new DorisStreamLoadWriter(
+ DATABASE_NAME, TABLE_NAME, properties.url(),
+ properties.username(), properties.password(),
+ writeConfig.streamLoadConfig()
+ );
+ if (streamLoadWriter.isAvailable()) {
+ log.info("[Doris] Stream Load writer initialized.");
+ } else {
+ log.warn("[Doris] Stream Load writer failed, fallback to
JDBC.");
+ this.writeMode = WRITE_MODE_JDBC;
+ }
+
+ this.logStreamLoadWriter = new DorisStreamLoadWriter(
+ DATABASE_NAME, LOG_TABLE_NAME, properties.url(),
+ properties.username(), properties.password(),
+ writeConfig.streamLoadConfig()
+ );
+ if (logStreamLoadWriter.isAvailable()) {
+ log.info("[Doris] Log Stream Load writer initialized.");
+ } else {
+ log.warn("[Doris] Log Stream Load writer unavailable, log
writes fallback to JDBC.");
+ this.logStreamLoadWriter = null;
+ }
+ } catch (Exception e) {
+ log.error("[Doris] Stream Load init failed: {}", e.getMessage(),
e);
+ this.writeMode = WRITE_MODE_JDBC;
+ }
+ }
+
+ /**
+ * Start the background flush task.
+ */
+ private void startFlushThread() {
+ try {
+ warehouseWorkerPool.executeJob(() -> {
+ flushTaskStarted = true;
+ try {
+ while (flushThreadRunning ||
!metricsBufferQueue.isEmpty()) {
+ try {
+ List<DorisMetricRow> batch = new
ArrayList<>(writeConfig.batchSize());
+
+ // Wait for data or timeout
+ DorisMetricRow first =
metricsBufferQueue.poll(writeConfig.flushInterval(), TimeUnit.SECONDS);
+ if (first != null) {
+ batch.add(first);
+ // Drain remaining items up to batch size
+ metricsBufferQueue.drainTo(batch,
writeConfig.batchSize() - 1);
+ }
+
+ if (!batch.isEmpty()) {
+ doSaveData(batch);
+ log.debug("[Doris] Flushed {} metrics items",
batch.size());
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.debug("[Doris] Flush task interrupted");
+ break;
+ } catch (Exception e) {
+ log.error("[Doris] Flush task error: {}",
e.getMessage(), e);
+ }
+ }
+ log.info("[Doris] Flush task stopped");
+ } finally {
+ flushTaskStopped.countDown();
+ }
+ });
+ log.info("[Doris] Started metrics flush task with interval {}
seconds", writeConfig.flushInterval());
+ } catch (RejectedExecutionException e) {
+ log.error("[Doris] Failed to start flush task from
WarehouseWorkerPool", e);
+ }
+ }
+
+ private String normalizeApp(String app) {
+ if (app != null &&
app.startsWith(CommonConstants.PROMETHEUS_APP_PREFIX)) {
+ return CommonConstants.PROMETHEUS;
+ }
+ return app;
+ }
+
+ @Override
+ public void saveData(CollectRep.MetricsData metricsData) {
+ if (!isServerAvailable() || metricsData.getCode() !=
CollectRep.Code.SUCCESS) {
+ return;
+ }
+ if (metricsData.getValues().isEmpty()) {
+ log.info("[Doris] Flush metrics data {} {} is null, ignore.",
metricsData.getId(),
+ metricsData.getMetrics());
+ return;
+ }
+
+ String instance = metricsData.getInstance();
+ String app = normalizeApp(metricsData.getApp());
+ String metrics = metricsData.getMetrics();
+ long timestamp = metricsData.getTime();
+
+ Map<String, String> customLabels = metricsData.getLabels();
+
+ List<DorisMetricRow> rows = new ArrayList<>();
+
+ try {
+ RowWrapper rowWrapper = metricsData.readRow();
+ while (rowWrapper.hasNextRow()) {
+ rowWrapper = rowWrapper.nextRow();
+
+ Map<String, String> rowLabels = new HashMap<>();
+
+ // Add custom labels
+ if (customLabels != null && !customLabels.isEmpty()) {
+ customLabels.forEach((k, v) -> rowLabels.put(k,
String.valueOf(v)));
+ }
+
+ var cells = rowWrapper.cellStream().toList();
+
+ // Collect all labels in this row first, then build metrics
rows.
+ for (var cell : cells) {
+ String value = cell.getValue();
+ if (CommonConstants.NULL_VALUE.equals(value)) {
+ continue;
+ }
+ boolean isLabel =
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
+ if (!isLabel) {
+ continue;
+ }
+ String fieldName = cell.getField().getName();
+ rowLabels.put(fieldName, value);
+ }
+
+ for (var cell : cells) {
+ String value = cell.getValue();
+ if (CommonConstants.NULL_VALUE.equals(value)) {
+ continue;
+ }
+
+ boolean isLabel =
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
+ if (isLabel) {
+ continue;
+ }
+ byte type =
cell.getMetadataAsByte(MetricDataConstants.TYPE);
+ String fieldName = cell.getField().getName();
+
+ // Create a metric row for each non-label field.
+ DorisMetricRow row = new DorisMetricRow();
+ row.instance = instance;
+ row.app = app;
+ row.metrics = metrics;
+ row.metric = fieldName;
+ row.recordTime = new Timestamp(timestamp);
+ row.labels = JsonUtil.toJson(rowLabels);
+
+ if (type == CommonConstants.TYPE_NUMBER) {
+ row.metricType = METRIC_TYPE_NUMBER;
+ try {
+ row.doubleValue = Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ log.debug("[Doris] Failed to parse number value:
{}", value);
+ continue;
+ }
+ } else if (type == CommonConstants.TYPE_STRING) {
+ row.metricType = METRIC_TYPE_STRING;
+ row.strValue = value;
+ } else if (type == CommonConstants.TYPE_TIME) {
+ row.metricType = METRIC_TYPE_TIME;
+ try {
+ row.int32Value = Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ log.debug("[Doris] Failed to parse time value:
{}", value);
+ continue;
+ }
+ } else {
+ row.metricType = METRIC_TYPE_STRING;
+ row.strValue = value;
+ }
+
+ rows.add(row);
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("[Doris] Error processing metrics data: {}",
e.getMessage(), e);
+ return;
+ }
+
+ if (rows.isEmpty()) {
+ log.debug("[Doris] No valid metrics data to save for {} {}", app,
metrics);
+ return;
+ }
+
+ sendToBuffer(rows);
+ }
+
+ /**
+ * Send metrics to buffer queue
+ */
+ private void sendToBuffer(List<DorisMetricRow> rows) {
+ for (int idx = 0; idx < rows.size(); idx++) {
+ DorisMetricRow row = rows.get(idx);
+ boolean offered = false;
+ int retryCount = 0;
+ while (!offered && retryCount < MAX_RETRIES) {
+ try {
+ offered = metricsBufferQueue.offer(row, MAX_WAIT_MS,
TimeUnit.MILLISECONDS);
+ if (!offered) {
+ if (retryCount == 0 && draining.compareAndSet(false,
true)) {
+ log.debug("[Doris] Buffer queue is full,
triggering immediate flush");
+ triggerImmediateFlush();
+ }
+ retryCount++;
+ if (retryCount < MAX_RETRIES) {
+ Thread.sleep(100L * retryCount);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("[Doris] Interrupted while offering metrics to
buffer queue", e);
+ break;
+ }
+ }
+ if (!offered) {
+ log.warn("[Doris] Failed to add metrics to buffer after {}
retries, saving directly", MAX_RETRIES);
+ // Save only rows that are not successfully queued yet to
avoid duplicate writes.
+ List<DorisMetricRow> remainingRows = new
ArrayList<>(rows.subList(idx, rows.size()));
+ doSaveData(remainingRows);
+ return;
+ }
+ }
+
+ // Trigger early flush if buffer is almost full
+ if (metricsBufferQueue.size() >= writeConfig.batchSize() * 8 &&
draining.compareAndSet(false, true)) {
+ triggerImmediateFlush();
+ }
+ }
+
+ /**
+ * Trigger immediate flush
+ */
+ private void triggerImmediateFlush() {
+ List<DorisMetricRow> batch = new ArrayList<>(writeConfig.batchSize());
+ metricsBufferQueue.drainTo(batch, writeConfig.batchSize());
+ if (batch.isEmpty()) {
+ draining.set(false);
+ return;
+ }
+ try {
+ warehouseWorkerPool.executeJob(() -> {
+ try {
+ doSaveData(batch);
+ } finally {
+ draining.set(false);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ try {
+ log.warn("[Doris] Immediate flush task rejected by
WarehouseWorkerPool, fallback to sync flush");
+ doSaveData(batch);
+ } finally {
+ draining.set(false);
+ }
+ }
+ }
+
+ /**
+ * Actually save data to Doris
+ * Uses either JDBC batch insert or Stream Load based on writeMode
configuration
+ */
+ private void doSaveData(List<DorisMetricRow> rows) {
+ if (rows == null || rows.isEmpty()) {
+ return;
+ }
+
+ if (WRITE_MODE_STREAM.equals(writeMode) && streamLoadWriter != null) {
+ boolean success = streamLoadWriter.write(rows);
+ if (!success) {
+ if (writeConfig.fallbackToJdbcOnFailure()) {
+ log.warn("[Doris] Stream Load failed,
fallbackToJdbcOnFailure=true, use JDBC for this batch");
+ doSaveDataJdbc(rows);
+ } else {
+ log.error("[Doris] Stream Load failed and JDBC fallback is
disabled. rows={}", rows.size());
+ }
+ }
+ } else {
+ // Use JDBC batch insert (default)
+ doSaveDataJdbc(rows);
+ }
+ }
+
+ /**
+ * Save data using JDBC batch insert
+ */
+ private void doSaveDataJdbc(List<DorisMetricRow> rows) {
+ if (rows == null || rows.isEmpty()) {
+ return;
+ }
+
+ String insertSql = """
+ INSERT INTO %s.%s (instance, app, metrics, metric,
metric_type, int32_value, double_value, str_value, record_time, labels)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """
+ .formatted(DATABASE_NAME, TABLE_NAME);
+
+ try (Connection conn = dataSource.getConnection()) {
+ conn.setAutoCommit(false);
+
+ try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {
+ for (DorisMetricRow row : rows) {
+ pstmt.setString(1, row.instance);
+ pstmt.setString(2, row.app);
+ pstmt.setString(3, row.metrics);
+ pstmt.setString(4, row.metric);
+ pstmt.setByte(5, row.metricType);
+
+ if (row.int32Value != null) {
+ pstmt.setInt(6, row.int32Value);
+ } else {
+ pstmt.setNull(6, java.sql.Types.INTEGER);
+ }
+
+ if (row.doubleValue != null) {
+ pstmt.setDouble(7, row.doubleValue);
+ } else {
+ pstmt.setNull(7, java.sql.Types.DOUBLE);
+ }
+
+ pstmt.setString(8, row.strValue);
+ pstmt.setTimestamp(9, row.recordTime);
+ pstmt.setString(10, row.labels);
+
+ pstmt.addBatch();
+ }
+
+ pstmt.executeBatch();
+ conn.commit();
+ log.debug("[Doris] Successfully saved {} metrics rows",
rows.size());
+
+ } catch (SQLException e) {
+ conn.rollback();
+ throw e;
+ }
+ } catch (SQLException e) {
+ log.error("[Doris] Failed to save metrics data: {}",
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Map<String, List<Value>> getHistoryMetricData(String instance,
String app, String metrics, String metric,
+ String history) {
+ if (!isServerAvailable()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, Long> timeRange = getTimeRange(history);
+ app = normalizeApp(app);
+ Long start = timeRange.get(LABEL_KEY_START_TIME);
+ Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+ return queryMetricData(instance, app, metrics, metric, start, end);
+ }
+
+ @Override
+ public Map<String, List<Value>> getHistoryIntervalMetricData(String
instance, String app, String metrics,
+ String metric, String history) {
+ if (!isServerAvailable()) {
+ log.error("""
+
+ \t---------------Doris Init Failed---------------
+ \t--------------Please Config Doris--------------
+ \t----------Can Not Use Metric History Now----------
+ """);
+ return Collections.emptyMap();
+ }
+
+ Map<String, Long> timeRange = getTimeRange(history);
+ app = normalizeApp(app);
+ Long start = timeRange.get(LABEL_KEY_START_TIME);
+ Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+ // First get the basic data
+ Map<String, List<Value>> instanceValuesMap = queryMetricData(instance,
app, metrics, metric, start, end);
+
+ if (instanceValuesMap.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Calculate aggregations for each label group
+ try {
+ queryAndSetAggregations(instanceValuesMap, instance, app, metrics,
metric, start, end);
+ } catch (Exception e) {
+ log.error("[Doris] Error calculating aggregations: {}",
e.getMessage(), e);
+ }
+
+ return instanceValuesMap;
+ }
+
+ /**
+ * Query metric data from Doris
+ */
+ private Map<String, List<Value>> queryMetricData(String instance, String
app, String metrics, String metric,
+ long startTime, long endTime) {
+ Map<String, List<Value>> instanceValuesMap = new HashMap<>();
+
+ String sql = """
+ SELECT record_time, metric_type, int32_value, double_value,
str_value, labels
+ FROM %s.%s
+ WHERE instance = ? AND app = ? AND metrics = ? AND metric = ?
+ AND record_time >= ? AND record_time <= ?
+ ORDER BY record_time DESC
+ LIMIT %d
+ """.formatted(DATABASE_NAME, TABLE_NAME, MAX_QUERY_LIMIT);
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(sql)) {
+
+ pstmt.setString(1, instance);
+ pstmt.setString(2, app);
+ pstmt.setString(3, metrics);
+ pstmt.setString(4, metric);
+ pstmt.setTimestamp(5, new Timestamp(startTime * 1000));
+ pstmt.setTimestamp(6, new Timestamp(endTime * 1000));
+
+ try (ResultSet rs = pstmt.executeQuery()) {
+ while (rs.next()) {
+ Timestamp recordTime = rs.getTimestamp("record_time");
+ byte metricType = rs.getByte("metric_type");
+ String labels = rs.getString("labels");
+
+ String valueStr;
+ if (metricType == METRIC_TYPE_NUMBER) {
+ double doubleValue = rs.getDouble("double_value");
+ if (!rs.wasNull()) {
+ valueStr = BigDecimal.valueOf(doubleValue)
+ .setScale(4, RoundingMode.HALF_UP)
+ .stripTrailingZeros()
+ .toPlainString();
+ } else {
+ continue;
+ }
+ } else if (metricType == METRIC_TYPE_TIME) {
+ int intValue = rs.getInt("int32_value");
+ if (!rs.wasNull()) {
+ valueStr = String.valueOf(intValue);
+ } else {
+ continue;
+ }
+ } else {
+ valueStr = rs.getString("str_value");
+ if (valueStr == null) {
+ continue;
+ }
+ }
+
+ String labelKey = labels != null ? labels : "{}";
+ List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelKey, k -> new LinkedList<>());
+ valueList.add(new Value(valueStr, recordTime.getTime()));
+ }
+ }
+
+ log.debug("[Doris] Query returned {} label groups",
instanceValuesMap.size());
+
+ } catch (SQLException e) {
+ log.error("[Doris] Failed to query metrics data: {}",
e.getMessage(), e);
+ }
+
+ return instanceValuesMap;
+ }
+
+ /**
+ * Query and set aggregation values (max, min, avg)
+ */
+ private void queryAndSetAggregations(Map<String, List<Value>>
instanceValuesMap,
+ String instance, String app, String metrics, String metric,
+ long startTime, long endTime) {
+ // Calculate step based on time range
+ long duration = endTime - startTime;
+ int stepSeconds;
+
+ if (duration < Duration.ofDays(1).getSeconds()) {
+ stepSeconds = 60;
+ } else if (duration < Duration.ofDays(7).getSeconds()) {
+ stepSeconds = 3600; // 1 hour
+ } else {
+ stepSeconds = 14400; // 4 hours
+ }
+
+ String aggregationSql = """
+ SELECT
+ labels,
+ FLOOR(UNIX_TIMESTAMP(record_time) / %d) * %d AS
time_bucket,
+ MAX(double_value) as max_val,
+ MIN(double_value) as min_val,
+ AVG(double_value) as avg_val
+ FROM %s.%s
+ WHERE instance = ? AND app = ? AND metrics = ? AND metric = ?
+ AND record_time >= ? AND record_time <= ?
+ AND metric_type = %d
+ GROUP BY labels, time_bucket
+ ORDER BY time_bucket
+ """.formatted(stepSeconds, stepSeconds, DATABASE_NAME,
TABLE_NAME, METRIC_TYPE_NUMBER);
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt =
conn.prepareStatement(aggregationSql)) {
+
+ pstmt.setString(1, instance);
+ pstmt.setString(2, app);
+ pstmt.setString(3, metrics);
+ pstmt.setString(4, metric);
+ pstmt.setTimestamp(5, new Timestamp(startTime * 1000));
+ pstmt.setTimestamp(6, new Timestamp(endTime * 1000));
+
+ Map<String, Map<Long, double[]>> labelBucketAggregations = new
HashMap<>();
+
+ try (ResultSet rs = pstmt.executeQuery()) {
+ while (rs.next()) {
+ String labels = rs.getString("labels");
+ long timeBucket = rs.getLong("time_bucket");
+ double maxVal = rs.getDouble("max_val");
+ double minVal = rs.getDouble("min_val");
+ double avgVal = rs.getDouble("avg_val");
+
+ String labelKey = labels != null ? labels : "{}";
+ Map<Long, double[]> bucketMap =
labelBucketAggregations.computeIfAbsent(labelKey,
+ k -> new HashMap<>());
+ bucketMap.put(timeBucket * 1000, new double[] { maxVal,
minVal, avgVal });
+ }
+ }
+
+ // Apply aggregations to values
+ for (Map.Entry<String, List<Value>> entry :
instanceValuesMap.entrySet()) {
+ String labelKey = entry.getKey();
+ List<Value> values = entry.getValue();
+ Map<Long, double[]> bucketMap =
labelBucketAggregations.get(labelKey);
+
+ if (bucketMap != null) {
+ for (Value value : values) {
+ // Find the matching bucket
+ long valueBucket = (value.getTime() / (stepSeconds *
1000L)) * (stepSeconds * 1000L);
+ double[] aggregations = bucketMap.get(valueBucket);
+ if (aggregations != null) {
+ value.setMax(formatDouble(aggregations[0]));
+ value.setMin(formatDouble(aggregations[1]));
+ value.setMean(formatDouble(aggregations[2]));
+ }
+ }
+ }
+ }
+
+ } catch (SQLException e) {
+ log.error("[Doris] Failed to query aggregations: {}",
e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Format double value to string
+ */
+ private String formatDouble(double value) {
+ return BigDecimal.valueOf(value)
+ .setScale(4, RoundingMode.HALF_UP)
+ .stripTrailingZeros()
+ .toPlainString();
+ }
+
+ /**
+ * Get time range based on history parameter
+ */
+ private Map<String, Long> getTimeRange(String history) {
+ Instant now = Instant.now();
+ long start;
+ try {
+ if (NumberUtils.isParsable(history)) {
+ start = NumberUtils.toLong(history);
+ start = ZonedDateTime.now().toEpochSecond() - start;
+ } else {
+ TemporalAmount temporalAmount =
TimePeriodUtil.parseTokenTime(history);
+ assert temporalAmount != null;
+ Instant dateTime = now.minus(temporalAmount);
+ start = dateTime.getEpochSecond();
+ }
+ } catch (Exception e) {
+ log.error("[Doris] History time error: {}. Using default: 6h",
e.getMessage());
+ start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+ }
+ long end = now.getEpochSecond();
+ return Map.of(LABEL_KEY_START_TIME, start, LABEL_KEY_END_TIME, end);
+ }
+
+ @Override
+ public void saveLogData(LogEntry logEntry) {
+ if (logEntry == null) {
+ return;
+ }
+ saveLogDataBatch(List.of(logEntry));
+ }
+
+ @Override
+ public void saveLogDataBatch(List<LogEntry> logEntries) {
+ if (!isServerAvailable() || logEntries == null ||
logEntries.isEmpty()) {
+ return;
+ }
+
+ int total = logEntries.size();
+ for (int i = 0; i < total; i += LOG_BATCH_SIZE) {
+ int end = Math.min(i + LOG_BATCH_SIZE, total);
+ List<LogEntry> batch = logEntries.subList(i, end);
+ if (WRITE_MODE_STREAM.equals(writeMode) && logStreamLoadWriter !=
null) {
+ boolean success = logStreamLoadWriter.writeLogs(batch);
+ if (!success) {
+ if (writeConfig.fallbackToJdbcOnFailure()) {
+ log.warn("[Doris] Log Stream Load failed,
fallbackToJdbcOnFailure=true, use JDBC for this batch");
+ doSaveLogDataJdbc(batch);
+ } else {
+ log.error("[Doris] Log Stream Load failed and JDBC
fallback is disabled. rows={}", batch.size());
+ }
+ }
+ } else {
+ doSaveLogDataJdbc(batch);
+ }
+ }
+ }
+
+ private void doSaveLogDataJdbc(List<LogEntry> logEntries) {
+ if (logEntries == null || logEntries.isEmpty()) {
+ return;
+ }
+
+ String insertSql = """
+ INSERT INTO %s.%s (
+ time_unix_nano, trace_id, span_id, event_time,
observed_time_unix_nano,
+ severity_number, severity_text, body, trace_flags, attributes,
resource,
+ instrumentation_scope, dropped_attributes_count
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """.formatted(DATABASE_NAME, LOG_TABLE_NAME);
+
+ try (Connection conn = dataSource.getConnection()) {
+ conn.setAutoCommit(false);
+ try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {
+ for (LogEntry logEntry : logEntries) {
+ long timeUnixNano =
normalizeTimeUnixNano(logEntry.getTimeUnixNano());
+ long observedTimeUnixNano =
logEntry.getObservedTimeUnixNano() != null
+ ? logEntry.getObservedTimeUnixNano()
+ : timeUnixNano;
+
+ pstmt.setLong(1, timeUnixNano);
+ pstmt.setString(2, logEntry.getTraceId());
+ pstmt.setString(3, logEntry.getSpanId());
+ pstmt.setTimestamp(4, nanosToTimestamp(timeUnixNano));
+ pstmt.setLong(5, observedTimeUnixNano);
+
+ if (logEntry.getSeverityNumber() != null) {
+ pstmt.setInt(6, logEntry.getSeverityNumber());
+ } else {
+ pstmt.setNull(6, java.sql.Types.INTEGER);
+ }
+ pstmt.setString(7, logEntry.getSeverityText());
+ pstmt.setString(8, JsonUtil.toJson(logEntry.getBody()));
+ if (logEntry.getTraceFlags() != null) {
+ pstmt.setInt(9, logEntry.getTraceFlags());
+ } else {
+ pstmt.setNull(9, java.sql.Types.INTEGER);
+ }
+ pstmt.setString(10,
JsonUtil.toJson(logEntry.getAttributes()));
+ pstmt.setString(11,
JsonUtil.toJson(logEntry.getResource()));
+ pstmt.setString(12,
JsonUtil.toJson(logEntry.getInstrumentationScope()));
+ if (logEntry.getDroppedAttributesCount() != null) {
+ pstmt.setInt(13, logEntry.getDroppedAttributesCount());
+ } else {
+ pstmt.setNull(13, java.sql.Types.INTEGER);
+ }
+
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ conn.commit();
+ } catch (SQLException e) {
+ conn.rollback();
+ throw e;
+ }
+ } catch (SQLException e) {
+ log.error("[Doris] Failed to save log data: {}", e.getMessage(),
e);
+ }
+ }
+
+ @Override
+ public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long
endTime, String traceId,
+ String spanId, Integer
severityNumber,
+ String severityText,
String searchContent) {
+ StringBuilder sql = new StringBuilder("""
+ SELECT time_unix_nano, observed_time_unix_nano,
severity_number, severity_text, body,
+ trace_id, span_id, trace_flags, attributes, resource,
instrumentation_scope, dropped_attributes_count
+ FROM %s.%s
+ """.formatted(DATABASE_NAME, LOG_TABLE_NAME));
+ List<Object> params = new ArrayList<>();
+ appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId,
severityNumber, severityText, searchContent);
+ sql.append(" ORDER BY time_unix_nano DESC");
+ return executeLogQuery(sql.toString(), params,
"queryLogsByMultipleConditions");
+ }
+
+ @Override
+ public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
+ String
spanId, Integer severityNumber,
+ String
severityText, String searchContent,
+ Integer
offset, Integer limit) {
+ StringBuilder sql = new StringBuilder("""
+ SELECT time_unix_nano, observed_time_unix_nano,
severity_number, severity_text, body,
+ trace_id, span_id, trace_flags, attributes, resource,
instrumentation_scope, dropped_attributes_count
+ FROM %s.%s
+ """.formatted(DATABASE_NAME, LOG_TABLE_NAME));
+ List<Object> params = new ArrayList<>();
+ appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId,
severityNumber, severityText, searchContent);
+ sql.append(" ORDER BY time_unix_nano DESC");
+ if (limit != null && limit > 0) {
+ sql.append(" LIMIT ?");
+ params.add(limit);
+ if (offset != null && offset > 0) {
+ sql.append(" OFFSET ?");
+ params.add(offset);
+ }
+ }
+ return executeLogQuery(sql.toString(), params,
"queryLogsByMultipleConditionsWithPagination");
+ }
+
+ @Override
+ public long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
+ String spanId, Integer
severityNumber,
+ String severityText, String
searchContent) {
+ StringBuilder sql = new StringBuilder("SELECT COUNT(*) AS count FROM
%s.%s"
+ .formatted(DATABASE_NAME, LOG_TABLE_NAME));
+ List<Object> params = new ArrayList<>();
+ appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId,
severityNumber, severityText, searchContent);
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(sql.toString())) {
+ bindParameters(pstmt, params);
+ try (ResultSet rs = pstmt.executeQuery()) {
+ if (rs.next()) {
+ return rs.getLong("count");
+ }
+ return 0;
+ }
+ } catch (Exception e) {
+ log.error("[Doris] countLogsByMultipleConditions error: {}",
e.getMessage(), e);
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean batchDeleteLogs(List<Long> timeUnixNanos) {
+ if (!isServerAvailable() || timeUnixNanos == null ||
timeUnixNanos.isEmpty()) {
+ return false;
+ }
+ List<Long> validTimeUnixNanos = timeUnixNanos.stream()
+ .filter(Objects::nonNull)
+ .toList();
+ if (validTimeUnixNanos.isEmpty()) {
+ return false;
+ }
+
+ String placeholders = String.join(",",
Collections.nCopies(validTimeUnixNanos.size(), "?"));
+ String sql = "DELETE FROM %s.%s WHERE time_unix_nano IN (%s)"
+ .formatted(DATABASE_NAME, LOG_TABLE_NAME, placeholders);
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(sql)) {
+ int index = 1;
+ for (Long timeUnixNano : validTimeUnixNanos) {
+ pstmt.setLong(index++, timeUnixNano);
+ }
+ pstmt.executeUpdate();
+ return true;
+ } catch (Exception e) {
+ log.error("[Doris] batchDeleteLogs error: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ private void appendLogWhereClause(StringBuilder sql, List<Object> params,
+ Long startTime, Long endTime,
+ String traceId, String spanId,
+ Integer severityNumber, String
severityText,
+ String searchContent) {
+ List<String> conditions = new ArrayList<>();
+ if (startTime != null) {
+ conditions.add("time_unix_nano >= ?");
+ params.add(msToNs(startTime));
+ }
+ if (endTime != null) {
+ conditions.add("time_unix_nano <= ?");
+ params.add(msToNs(endTime));
+ }
+ if (StringUtils.hasText(traceId)) {
+ conditions.add("trace_id = ?");
+ params.add(traceId);
+ }
+ if (StringUtils.hasText(spanId)) {
+ conditions.add("span_id = ?");
+ params.add(spanId);
+ }
+ if (severityNumber != null) {
+ conditions.add("severity_number = ?");
+ params.add(severityNumber);
+ }
+ if (StringUtils.hasText(severityText)) {
+ conditions.add("severity_text = ?");
+ params.add(severityText);
+ }
+ if (StringUtils.hasText(searchContent)) {
+ conditions.add("body LIKE ?");
+ params.add("%" + searchContent + "%");
+ }
+ if (!conditions.isEmpty()) {
+ sql.append(" WHERE ").append(String.join(" AND ", conditions));
+ }
+ }
+
+ private void bindParameters(PreparedStatement pstmt, List<Object> params)
throws SQLException {
+ for (int i = 0; i < params.size(); i++) {
+ pstmt.setObject(i + 1, params.get(i));
+ }
+ }
+
+ private List<LogEntry> executeLogQuery(String sql, List<Object> params,
String queryName) {
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement pstmt = conn.prepareStatement(sql)) {
+ bindParameters(pstmt, params);
+ try (ResultSet rs = pstmt.executeQuery()) {
+ return mapRowsToLogEntries(rs);
+ }
+ } catch (Exception e) {
+ log.error("[Doris] {} error: {}", queryName, e.getMessage(), e);
+ return List.of();
+ }
+ }
+
+ private List<LogEntry> mapRowsToLogEntries(ResultSet rs) throws
SQLException {
+ List<LogEntry> entries = new LinkedList<>();
+ while (rs.next()) {
+ String instrumentationScopeStr =
rs.getString("instrumentation_scope");
+ LogEntry.InstrumentationScope instrumentationScope = null;
+ if (StringUtils.hasText(instrumentationScopeStr)) {
+ instrumentationScope =
JsonUtil.fromJson(instrumentationScopeStr, LogEntry.InstrumentationScope.class);
+ }
+
+ LogEntry entry = LogEntry.builder()
+ .timeUnixNano(rs.getLong("time_unix_nano"))
+ .observedTimeUnixNano(rs.getLong("observed_time_unix_nano"))
+ .severityNumber(getNullableInteger(rs, "severity_number"))
+ .severityText(rs.getString("severity_text"))
+ .body(parseJsonMaybe(rs.getString("body")))
+ .traceId(rs.getString("trace_id"))
+ .spanId(rs.getString("span_id"))
+ .traceFlags(getNullableInteger(rs, "trace_flags"))
+
.attributes(castToMap(parseJsonMaybe(rs.getString("attributes"))))
+ .resource(castToMap(parseJsonMaybe(rs.getString("resource"))))
+ .instrumentationScope(instrumentationScope)
+ .droppedAttributesCount(getNullableInteger(rs,
"dropped_attributes_count"))
+ .build();
+ entries.add(entry);
+ }
+ return entries;
+ }
+
+ private Integer getNullableInteger(ResultSet rs, String columnName) throws
SQLException {
+ int value = rs.getInt(columnName);
+ return rs.wasNull() ? null : value;
+ }
+
+ private static long msToNs(Long ms) {
+ return ms * NANOS_PER_MILLISECOND;
+ }
+
+ private long normalizeTimeUnixNano(Long timeUnixNano) {
+ return timeUnixNano != null ? timeUnixNano :
System.currentTimeMillis() * NANOS_PER_MILLISECOND;
+ }
+
+ private Timestamp nanosToTimestamp(long timeUnixNano) {
+ return new Timestamp(timeUnixNano / NANOS_PER_MILLISECOND);
+ }
+
+ private Object parseJsonMaybe(String value) {
+ if (!StringUtils.hasText(value)) {
+ return null;
+ }
+ String trimmed = value.trim();
+ boolean maybeJson = (trimmed.startsWith("{") && trimmed.endsWith("}"))
+ || (trimmed.startsWith("[") && trimmed.endsWith("]"))
+ || (trimmed.startsWith("\"") && trimmed.endsWith("\""));
+ if (!maybeJson) {
+ return trimmed;
+ }
+ Object parsed = JsonUtil.fromJson(trimmed, Object.class);
+ return parsed != null ? parsed : trimmed;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> castToMap(Object value) {
+ if (value instanceof Map<?, ?> map) {
+ return (Map<String, Object>) map;
+ }
+ return null;
+ }
+
+ @Override
+ public void destroy() {
+ log.info("[Doris] Shutting down...");
+
+ flushThreadRunning = false;
+
+ // Wait flush task to finish.
+ if (flushTaskStarted) {
+ try {
+ boolean stopped = flushTaskStopped.await(10000,
TimeUnit.MILLISECONDS);
+ if (!stopped) {
+ log.warn("[Doris] Timed out waiting for flush task to
stop");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // Flush remaining data
+ List<DorisMetricRow> remaining = new ArrayList<>();
+ metricsBufferQueue.drainTo(remaining);
+ if (!remaining.isEmpty()) {
+ log.info("[Doris] Flushing {} remaining metrics before shutdown",
remaining.size());
+ doSaveData(remaining);
+ }
+
+ // Close data source
+ if (dataSource != null && !dataSource.isClosed()) {
+ dataSource.close();
+ log.info("[Doris] Connection pool closed.");
+ }
+
+ // Close Stream Load writer
+ if (streamLoadWriter != null) {
+ streamLoadWriter.close();
+ log.info("[Doris] Stream Load writer closed.");
+ }
+ if (logStreamLoadWriter != null) {
+ logStreamLoadWriter.close();
+ log.info("[Doris] Log Stream Load writer closed.");
+ }
+ }
+
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
new file mode 100644
index 0000000000..f921ce95ef
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.sql.Timestamp;
+
+/**
+ * Internal class to represent a metric row for Doris storage.
+ * This class is used by both DorisDataStorage and DorisStreamLoadWriter.
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class DorisMetricRow {
+ public String instance;
+ public String app;
+ public String metrics;
+ public String metric;
+ public byte metricType;
+ public Integer int32Value;
+ public Double doubleValue;
+ public String strValue;
+ public Timestamp recordTime;
+ public String labels;
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
new file mode 100644
index 0000000000..f8f43ae0f7
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import org.apache.hertzbeat.common.constants.ConfigConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.warehouse.constants.WarehouseConstants;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.bind.DefaultValue;
+
+/**
+ * Apache Doris configuration properties
+ */
+@ConfigurationProperties(prefix =
ConfigConstants.FunctionModuleConstants.WAREHOUSE
+ + SignConstants.DOT
+ + WarehouseConstants.STORE
+ + SignConstants.DOT
+ + WarehouseConstants.HistoryName.DORIS)
+public record DorisProperties(
+ @DefaultValue("false") boolean enabled,
+ @DefaultValue("jdbc:mysql://127.0.0.1:9030/hertzbeat") String url,
+ String username,
+ String password,
+ TableConfig tableConfig,
+ PoolConfig poolConfig,
+ WriteConfig writeConfig) {
+ /**
+ * Table structure configuration
+ */
+ public record TableConfig(
+ // Whether to enable dynamic partitioning (default enabled)
+ @DefaultValue("false") boolean enablePartition,
+ // Partition time unit: DAY, HOUR, MONTH
+ @DefaultValue("DAY") String partitionTimeUnit,
+ // Dynamic partition retention days
+ @DefaultValue("7") int partitionRetentionDays,
+ // Dynamic partition creation range (future partitions to create)
+ @DefaultValue("3") int partitionFutureDays,
+ // Number of buckets
+ @DefaultValue("8") int buckets,
+ // Number of replicas (recommended 3 for production)
+ @DefaultValue("1") int replicationNum,
+ // Maximum length of string columns
+ @DefaultValue("4096") int strColumnMaxLength) {
+ public TableConfig {
+ if (partitionRetentionDays <= 0) {
+ partitionRetentionDays = 7;
+ }
+ if (partitionFutureDays <= 0) {
+ partitionFutureDays = 3;
+ }
+ if (buckets <= 0) {
+ buckets = 8;
+ }
+ if (replicationNum <= 0) {
+ replicationNum = 1;
+ }
+ if (strColumnMaxLength <= 0 || strColumnMaxLength > 65533) {
+ strColumnMaxLength = 4096;
+ }
+ }
+ }
+
+ /**
+ * Connection pool configuration (based on HikariCP)
+ */
+ public record PoolConfig(
+ // Minimum idle connections
+ @DefaultValue("5") int minimumIdle,
+ // Maximum pool size
+ @DefaultValue("20") int maximumPoolSize,
+ // Connection timeout in milliseconds
+ @DefaultValue("30000") int connectionTimeout,
+ // Maximum connection lifetime in milliseconds (0 means no limit)
+ @DefaultValue("0") long maxLifetime,
+ // Idle connection timeout in milliseconds (0 means never recycle)
+ @DefaultValue("600000") long idleTimeout) {
+ public PoolConfig {
+ if (minimumIdle <= 0) {
+ minimumIdle = 5;
+ }
+ if (maximumPoolSize <= 0) {
+ maximumPoolSize = 20;
+ }
+ if (minimumIdle > maximumPoolSize) {
+ minimumIdle = maximumPoolSize;
+ }
+ if (connectionTimeout <= 0) {
+ connectionTimeout = 30000;
+ }
+ }
+ }
+
+ /**
+ * Write configuration
+ */
+ public record WriteConfig(
+ // Write mode: jdbc (batch insert) or stream (HTTP stream load)
+ @DefaultValue("jdbc") String writeMode,
+ // Batch write size (for jdbc mode)
+ @DefaultValue("1000") int batchSize,
+ // Batch write flush interval in seconds (for jdbc mode)
+ @DefaultValue("5") int flushInterval,
+ // Fallback to JDBC when stream load fails (may introduce
duplicate data in ambiguous cases)
+ @DefaultValue("false") boolean fallbackToJdbcOnFailure,
+ // Stream load configuration (for stream mode)
+ StreamLoadConfig streamLoadConfig) {
+ public WriteConfig {
+ String normalizedWriteMode = writeMode == null ? "" :
writeMode.trim().toLowerCase();
+ if (!"jdbc".equals(normalizedWriteMode) &&
!"stream".equals(normalizedWriteMode)) {
+ writeMode = "jdbc";
+ } else {
+ writeMode = normalizedWriteMode;
+ }
+ if (batchSize <= 0) {
+ batchSize = 1000;
+ }
+ if (flushInterval <= 0) {
+ flushInterval = 5;
+ }
+ if (streamLoadConfig == null) {
+ streamLoadConfig = StreamLoadConfig.createDefault();
+ }
+ }
+ }
+
+ /**
+ * Stream Load configuration for HTTP-based streaming writes
+ */
+ public record StreamLoadConfig(
+ // Doris FE HTTP port for Stream Load API
+ @DefaultValue(":8030") String httpPort,
+ // Stream load timeout in seconds
+ @DefaultValue("60") int timeout,
+ // Max batch size in bytes for stream load
+ @DefaultValue("10485760") int maxBytesPerBatch,
+ // Maximum allowed filter ratio in [0,1]
+ @DefaultValue("0.1") double maxFilterRatio,
+ // Enable strict mode
+ @DefaultValue("false") boolean strictMode,
+ // Import timezone, empty means Doris default
+ @DefaultValue("") String timezone,
+ // Redirect policy: direct/public/private
+ @DefaultValue("") String redirectPolicy,
+ // Group commit mode: async_mode/sync_mode/off_mode
+ @DefaultValue("") String groupCommit,
+ // Send batch parallelism, 0 means Doris default
+ @DefaultValue("0") int sendBatchParallelism,
+ // Retry times for one label when stream load is retryable
+ @DefaultValue("2") int retryTimes) {
+ public StreamLoadConfig {
+ if (httpPort == null || httpPort.isBlank()) {
+ httpPort = ":8030";
+ } else {
+ httpPort = httpPort.trim();
+ }
+ if (timeout <= 0) {
+ timeout = 60;
+ }
+ if (maxBytesPerBatch <= 0) {
+ maxBytesPerBatch = 10485760; // 10MB
+ }
+ if (maxFilterRatio < 0 || maxFilterRatio > 1) {
+ maxFilterRatio = 0.1;
+ }
+ if (sendBatchParallelism < 0) {
+ sendBatchParallelism = 0;
+ }
+ if (retryTimes < 0) {
+ retryTimes = 2;
+ }
+ if (!isValidRedirectPolicy(redirectPolicy)) {
+ redirectPolicy = "";
+ }
+ if (!isValidGroupCommit(groupCommit)) {
+ groupCommit = "";
+ }
+ timezone = timezone == null ? "" : timezone.trim();
+ }
+
+ /**
+ * Factory method to create default StreamLoadConfig
+ */
+ public static StreamLoadConfig createDefault() {
+ return new StreamLoadConfig(":8030", 60, 10485760,
+ 0.1, false, "", "", "", 0, 2);
+ }
+
+ private static boolean isValidRedirectPolicy(String value) {
+ if (value == null || value.isBlank()) {
+ return true;
+ }
+ return "direct".equalsIgnoreCase(value)
+ || "public".equalsIgnoreCase(value)
+ || "private".equalsIgnoreCase(value);
+ }
+
+ private static boolean isValidGroupCommit(String value) {
+ if (value == null || value.isBlank()) {
+ return true;
+ }
+ return "async_mode".equalsIgnoreCase(value)
+ || "sync_mode".equalsIgnoreCase(value)
+ || "off_mode".equalsIgnoreCase(value);
+ }
+ }
+
+ // Provide default values for nested configs if null
+ public DorisProperties {
+ if (tableConfig == null) {
+ tableConfig = new TableConfig(false, "DAY", 7, 3, 8, 1, 4096);
+ }
+ if (poolConfig == null) {
+ poolConfig = new PoolConfig(5, 20, 30000, 0, 600000);
+ }
+ if (writeConfig == null) {
+ writeConfig = new WriteConfig("jdbc", 1000, 5, false,
StreamLoadConfig.createDefault());
+ }
+ }
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
new file mode 100644
index 0000000000..d45213a91e
--- /dev/null
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.http.Header;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpHeaders;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.ProtocolException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Doris Stream Load writer using Apache HttpClient for high-throughput writes.
+ * Based on official Apache Doris Stream Load Java example.
+ * <p>
+ * Reference: <a
href="https://github.com/apache/doris/blob/master/samples/doris-stream-load-demo/src/main/java/DorisStreamLoad.java">...</a>
+ */
+@Slf4j
+public class DorisStreamLoadWriter {
+
+ private static final int CONNECT_TIMEOUT = 30000;
+ private static final int SOCKET_TIMEOUT = 60000;
+ private static final int CONNECTION_REQUEST_TIMEOUT = 5000;
+ private static final int MAX_TOTAL_CONNECTIONS = 20;
+ private static final int MAX_PER_ROUTE_CONNECTIONS = 10;
+ private static final long RETRY_BACKOFF_MS = 200L;
+ private static final DateTimeFormatter DORIS_DATETIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+ private static final String METRIC_JSON_PATHS =
+
"[\"$.instance\",\"$.app\",\"$.metrics\",\"$.metric\",\"$.recordTime\",\"$.metricType\",\"$.int32Value\",\"$.doubleValue\",\"$.strValue\",\"$.labels\"]";
+ private static final String METRIC_COLUMNS =
+
"instance,app,metrics,metric,record_time,metric_type,int32_value,double_value,str_value,labels";
+ private static final String LOG_JSON_PATHS =
+
"[\"$.timeUnixNano\",\"$.observedTimeUnixNano\",\"$.eventTime\",\"$.severityNumber\",\"$.severityText\""
+ +
",\"$.body\",\"$.traceId\",\"$.spanId\",\"$.traceFlags\",\"$.attributes\",\"$.resource\""
+ +
",\"$.instrumentationScope\",\"$.droppedAttributesCount\"]";
+ private static final String LOG_COLUMNS =
+
"time_unix_nano,observed_time_unix_nano,event_time,severity_number,severity_text,body,trace_id,span_id,trace_flags,attributes,resource,instrumentation_scope,dropped_attributes_count";
+
+ public static final String STATUS_SUCCESS = "Success";
+ public static final String STATUS_PUBLISH_TIMEOUT = "Publish Timeout";
+ public static final String STATUS_FAIL = "Fail";
+ public static final String STATUS_LABEL_ALREADY_EXISTS = "Label Already
Exists";
+
+ private final String databaseName;
+ private final String tableName;
+ private final String feHost;
+ private final int feHttpPort;
+ private final String username;
+ private final String password;
+ private final DorisProperties.StreamLoadConfig config;
+
+ private final AtomicLong transactionId = new AtomicLong(0);
+ private final CloseableHttpClient httpClient;
+
+ private enum LoadResult {
+ SUCCESS,
+ RETRYABLE_FAILURE,
+ NON_RETRYABLE_FAILURE
+ }
+
+ private static final class StreamLoadMetricRow {
+ public String instance;
+ public String app;
+ public String metrics;
+ public String metric;
+ public String recordTime;
+ public Byte metricType;
+ public Integer int32Value;
+ public Double doubleValue;
+ public String strValue;
+ public String labels;
+ }
+
+ private static final class StreamLoadLogRow {
+ public Long timeUnixNano;
+ public Long observedTimeUnixNano;
+ public String eventTime;
+ public Integer severityNumber;
+ public String severityText;
+ public String body;
+ public String traceId;
+ public String spanId;
+ public Integer traceFlags;
+ public String attributes;
+ public String resource;
+ public String instrumentationScope;
+ public Integer droppedAttributesCount;
+ }
+
+ /**
+ * -- GETTER --
+ * Check if writer is available
+ */
+ @Getter
+ private volatile boolean available = true;
+
+ public DorisStreamLoadWriter(String databaseName, String tableName,
+ String jdbcUrl, String username, String
password,
+ DorisProperties.StreamLoadConfig config) {
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.username = username;
+ this.password = password;
+ this.config = config;
+
+ this.feHost = parseHostFromJdbcUrl(jdbcUrl);
+ this.feHttpPort = parseHttpPort(config.httpPort());
+
+ // Create HTTP client with connection pool and redirect support
+ this.httpClient = createHttpClient();
+
+ log.info("[Doris StreamLoad] Writer initialized for {}.{}",
databaseName, tableName);
+ }
+
+ private String parseHostFromJdbcUrl(String jdbcUrl) {
+ if (jdbcUrl == null || jdbcUrl.isBlank()) {
+ return "127.0.0.1";
+ }
+
+ String hostPart = jdbcUrl;
+ if (hostPart.startsWith("jdbc:mysql://")) {
+ hostPart = hostPart.substring("jdbc:mysql://".length());
+ }
+
+ int slashIndex = hostPart.indexOf('/');
+ if (slashIndex > 0) {
+ hostPart = hostPart.substring(0, slashIndex);
+ }
+
+ int queryIndex = hostPart.indexOf('?');
+ if (queryIndex > 0) {
+ hostPart = hostPart.substring(0, queryIndex);
+ }
+
+ // Multi-host URL: use first host
+ int commaIndex = hostPart.indexOf(',');
+ if (commaIndex > 0) {
+ hostPart = hostPart.substring(0, commaIndex);
+ }
+
+ int colonIndex = hostPart.lastIndexOf(':');
+ if (colonIndex > 0) {
+ hostPart = hostPart.substring(0, colonIndex);
+ }
+
+ return hostPart;
+ }
+
+ private int parseHttpPort(String portStr) {
+ if (portStr == null || portStr.isBlank()) {
+ return 8030;
+ }
+ if (portStr.startsWith(":")) {
+ portStr = portStr.substring(1);
+ }
+ try {
+ return Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ return 8030; // default
+ }
+ }
+
+ /**
+ * Create HTTP client with connection pool and automatic redirect handling
+ * Replaces internal IPs in redirect URLs with external FE host for cloud
deployments
+ */
+ private CloseableHttpClient createHttpClient() {
+ // Connection pool configuration
+ PoolingHttpClientConnectionManager connectionManager = new
PoolingHttpClientConnectionManager();
+ connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
+ connectionManager.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
+
+ // Request configuration
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(CONNECT_TIMEOUT)
+ .setSocketTimeout(SOCKET_TIMEOUT)
+ .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+ .build();
+
+ // Custom redirect strategy to handle PUT requests and replace
internal IPs
+ DefaultRedirectStrategy redirectStrategy = new
DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ // Enable redirect for PUT method (required for Doris Stream
Load)
+ return true;
+ }
+
+ @Override
+ public HttpUriRequest getRedirect(
+ HttpRequest request,
+ HttpResponse response,
+ HttpContext context) throws ProtocolException {
+ URI uri = getLocationURI(request, response, context);
+ String redirectLocation = uri.toString();
+
+ // Check if redirect points to localhost/internal IP
+ String host = uri.getHost();
+ if (host != null && ("127.0.0.1".equals(host) ||
"localhost".equals(host)
+ || host.startsWith("192.168.") ||
host.startsWith("10.")
+ || "0.0.0.0".equals(host))) {
+ // Replace internal IP with external FE host, keep BE port
and path
+ int port = uri.getPort() > 0 ? uri.getPort() : 8040;
+ String path = uri.getPath();
+ String query = uri.getQuery();
+ redirectLocation = "http://" + feHost + ":" + port + path
+ + (query != null ? "?" + query : "");
+ log.debug("[Doris StreamLoad] Replaced internal IP {} with
external {}:{}",
+ host, feHost, port);
+ try {
+ uri = new URI(redirectLocation);
+ } catch (URISyntaxException e) {
+ log.warn("[Doris StreamLoad] Failed to build URI for
redirect location", e);
+ }
+ }
+
+ // Remove any embedded credentials like http://root:@
+ if (redirectLocation.contains("@")) {
+ try {
+ URIBuilder uriBuilder = new
URIBuilder(redirectLocation);
+ uriBuilder.setUserInfo(null);
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ log.warn("[Doris StreamLoad] Failed to remove
credentials from redirect URL", e);
+ }
+ }
+
+ // Create new redirect request
+ HttpPut redirect = new HttpPut(uri);
+ if (request instanceof HttpEntityEnclosingRequest sourceRequest
+ && sourceRequest.getEntity() != null) {
+ redirect.setEntity(sourceRequest.getEntity());
+ }
+ // Copy headers from original request
+ copyHeaders(request, redirect);
+
+ return redirect;
+ }
+
+ private void copyHeaders(HttpRequest source, HttpUriRequest
target) {
+ // Keep all business headers for redirected PUT request.
+ for (Header header : source.getAllHeaders()) {
+ String name = header.getName();
+ if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name)
+ || HttpHeaders.HOST.equalsIgnoreCase(name)
+ ||
HttpHeaders.TRANSFER_ENCODING.equalsIgnoreCase(name)) {
+ continue;
+ }
+ target.setHeader(name, header.getValue());
+ }
+ }
+ };
+
+ return HttpClients.custom()
+ .setConnectionManager(connectionManager)
+ .setDefaultRequestConfig(requestConfig)
+ .setRedirectStrategy(redirectStrategy)
+ .build();
+ }
+
+ /**
+ * Build Basic Authentication header
+ */
+ private String basicAuthHeader(String username, String password) {
+ String toBeEncode = username + ":" + (password != null ? password :
"");
+ byte[] encoded =
Base64.encodeBase64(toBeEncode.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encoded, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Build unique label for idempotency
+ */
+ private String buildLabel() {
+ return "hzb_" + System.currentTimeMillis() + "_" +
transactionId.incrementAndGet();
+ }
+
+ /**
+ * Build Stream Load URL
+ */
+ private String buildLoadUrl() {
+ return String.format("http://%s:%d/api/%s/%s/_stream_load",
+ feHost, feHttpPort, databaseName, tableName);
+ }
+
+ /**
+ * Write metrics data using Stream Load API
+ *
+ * @param rows list of metric rows to write
+ * @return true if write succeeded, false otherwise
+ */
+ public boolean write(List<DorisMetricRow> rows) {
+ if (!available || rows == null || rows.isEmpty()) {
+ return false;
+ }
+
+ return writeMetricWithAutoSplit(new ArrayList<>(rows));
+ }
+
+ public boolean writeLogs(List<LogEntry> logEntries) {
+ if (!available || logEntries == null || logEntries.isEmpty()) {
+ return false;
+ }
+ return writeLogWithAutoSplit(new ArrayList<>(logEntries));
+ }
+
+ private boolean writeMetricWithAutoSplit(List<DorisMetricRow> rows) {
+ List<StreamLoadMetricRow> streamLoadRows = toStreamLoadRows(rows);
+ String jsonData = JsonUtil.toJson(streamLoadRows);
+ if (jsonData == null) {
+ log.error("[Doris StreamLoad] Failed to serialize {} rows to
JSON.", rows.size());
+ return false;
+ }
+
+ int payloadBytes = jsonData.getBytes(StandardCharsets.UTF_8).length;
+ int maxBytesPerBatch = config.maxBytesPerBatch();
+ if (payloadBytes > maxBytesPerBatch && rows.size() > 1) {
+ int mid = rows.size() / 2;
+ List<DorisMetricRow> left = new ArrayList<>(rows.subList(0, mid));
+ List<DorisMetricRow> right = new ArrayList<>(rows.subList(mid,
rows.size()));
+ log.debug("[Doris StreamLoad] Split batch: rows={}, bytes={},
maxBytes={}",
+ rows.size(), payloadBytes, maxBytesPerBatch);
+ return writeMetricWithAutoSplit(left) &&
writeMetricWithAutoSplit(right);
+ }
+
+ String label = buildLabel();
+ return writeSingleBatch(rows.size(), jsonData, label,
METRIC_JSON_PATHS, METRIC_COLUMNS);
+ }
+
+ private boolean writeLogWithAutoSplit(List<LogEntry> logEntries) {
+ List<StreamLoadLogRow> streamLoadRows =
toStreamLoadLogRows(logEntries);
+ String jsonData = JsonUtil.toJson(streamLoadRows);
+ if (jsonData == null) {
+ log.error("[Doris StreamLoad] Failed to serialize {} log entries
to JSON.", logEntries.size());
+ return false;
+ }
+
+ int payloadBytes = jsonData.getBytes(StandardCharsets.UTF_8).length;
+ int maxBytesPerBatch = config.maxBytesPerBatch();
+ if (payloadBytes > maxBytesPerBatch && logEntries.size() > 1) {
+ int mid = logEntries.size() / 2;
+ List<LogEntry> left = new ArrayList<>(logEntries.subList(0, mid));
+ List<LogEntry> right = new ArrayList<>(logEntries.subList(mid,
logEntries.size()));
+ log.debug("[Doris StreamLoad] Split log batch: rows={}, bytes={},
maxBytes={}",
+ logEntries.size(), payloadBytes, maxBytesPerBatch);
+ return writeLogWithAutoSplit(left) && writeLogWithAutoSplit(right);
+ }
+
+ String label = buildLabel();
+ return writeSingleBatch(logEntries.size(), jsonData, label,
LOG_JSON_PATHS, LOG_COLUMNS);
+ }
+
+ private List<StreamLoadMetricRow> toStreamLoadRows(List<DorisMetricRow>
rows) {
+ List<StreamLoadMetricRow> result = new ArrayList<>(rows.size());
+ for (DorisMetricRow row : rows) {
+ StreamLoadMetricRow item = new StreamLoadMetricRow();
+ item.instance = row.instance;
+ item.app = row.app;
+ item.metrics = row.metrics;
+ item.metric = row.metric;
+ item.recordTime = formatRecordTime(row.recordTime);
+ item.metricType = row.metricType;
+ item.int32Value = row.int32Value;
+ item.doubleValue = row.doubleValue;
+ item.strValue = row.strValue;
+ item.labels = row.labels;
+ result.add(item);
+ }
+ return result;
+ }
+
+ private String formatRecordTime(Timestamp timestamp) {
+ if (timestamp == null) {
+ return null;
+ }
+ return
DORIS_DATETIME_FORMATTER.format(timestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
+ }
+
+ private List<StreamLoadLogRow> toStreamLoadLogRows(List<LogEntry>
logEntries) {
+ List<StreamLoadLogRow> result = new ArrayList<>(logEntries.size());
+ for (LogEntry logEntry : logEntries) {
+ long timeUnixNano = logEntry.getTimeUnixNano() != null
+ ? logEntry.getTimeUnixNano()
+ : System.currentTimeMillis() * 1_000_000L;
+ long observedTimeUnixNano = logEntry.getObservedTimeUnixNano() !=
null
+ ? logEntry.getObservedTimeUnixNano()
+ : timeUnixNano;
+
+ StreamLoadLogRow row = new StreamLoadLogRow();
+ row.timeUnixNano = timeUnixNano;
+ row.observedTimeUnixNano = observedTimeUnixNano;
+ row.eventTime = formatEpochNanos(timeUnixNano);
+ row.severityNumber = logEntry.getSeverityNumber();
+ row.severityText = logEntry.getSeverityText();
+ row.body = JsonUtil.toJson(logEntry.getBody());
+ row.traceId = logEntry.getTraceId();
+ row.spanId = logEntry.getSpanId();
+ row.traceFlags = logEntry.getTraceFlags();
+ row.attributes = JsonUtil.toJson(logEntry.getAttributes());
+ row.resource = JsonUtil.toJson(logEntry.getResource());
+ row.instrumentationScope =
JsonUtil.toJson(logEntry.getInstrumentationScope());
+ row.droppedAttributesCount = logEntry.getDroppedAttributesCount();
+ result.add(row);
+ }
+ return result;
+ }
+
+ private String formatEpochNanos(long epochNanos) {
+ long millis = epochNanos / 1_000_000L;
+ return DORIS_DATETIME_FORMATTER.format(
+
Instant.ofEpochMilli(millis).atZone(ZoneId.systemDefault()).toLocalDateTime());
+ }
+
+ private boolean writeSingleBatch(int rowCount, String jsonData, String
label,
+ String jsonPaths, String columns) {
+ String loadUrl = buildLoadUrl();
+ int maxAttempts = Math.max(1, config.retryTimes() + 1);
+
+ for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+ LoadResult loadResult;
+ String responseBody = "";
+ int statusCode = -1;
+ String action = String.format("label=%s attempt=%d/%d rows=%d",
+ label, attempt, maxAttempts, rowCount);
+
+ HttpPut httpPut = new HttpPut(loadUrl);
+ httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
+ httpPut.setHeader(HttpHeaders.AUTHORIZATION,
basicAuthHeader(username, password));
+ httpPut.setHeader("label", label);
+ httpPut.setHeader("format", "json");
+ httpPut.setHeader("strip_outer_array", "true");
+ httpPut.setHeader("timeout", String.valueOf(config.timeout()));
+ httpPut.setHeader("max_filter_ratio",
String.valueOf(config.maxFilterRatio()));
+ httpPut.setHeader("strict_mode",
String.valueOf(config.strictMode()));
+ httpPut.setHeader("jsonpaths", jsonPaths);
+ httpPut.setHeader("columns", columns);
+ setHeaderIfHasText(httpPut, "timezone", config.timezone());
+ setHeaderIfHasText(httpPut, "redirect-policy",
config.redirectPolicy());
+ setHeaderIfHasText(httpPut, "group_commit", config.groupCommit());
+ if (config.sendBatchParallelism() > 0) {
+ httpPut.setHeader("send_batch_parallelism",
String.valueOf(config.sendBatchParallelism()));
+ }
+ httpPut.setEntity(new StringEntity(jsonData,
StandardCharsets.UTF_8));
+
+ try (CloseableHttpResponse response = httpClient.execute(httpPut))
{
+ statusCode = response.getStatusLine().getStatusCode();
+ if (response.getEntity() != null) {
+ responseBody = EntityUtils.toString(response.getEntity());
+ }
+ loadResult = handleResponse(statusCode, responseBody,
rowCount);
+ } catch (Exception e) {
+ log.warn("[Doris StreamLoad] Request error, {}: {}", action,
e.getMessage());
+ loadResult = LoadResult.RETRYABLE_FAILURE;
+ }
+
+ if (loadResult == LoadResult.SUCCESS) {
+ return true;
+ }
+
+ if (loadResult == LoadResult.NON_RETRYABLE_FAILURE || attempt >=
maxAttempts) {
+ log.error("[Doris StreamLoad] Batch failed, {} statusCode={}
response={}",
+ action, statusCode, responseBody);
+ return false;
+ }
+
+ try {
+ Thread.sleep(RETRY_BACKOFF_MS * attempt);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("[Doris StreamLoad] Retry interrupted, {}", action);
+ return false;
+ }
+ }
+ return false;
+ }
+
+ private void setHeaderIfHasText(HttpPut httpPut, String name, String
value) {
+ if (value != null && !value.isBlank()) {
+ httpPut.setHeader(name, value.trim());
+ }
+ }
+
+ /**
+ * Handle Stream Load response
+ *
+ * Note: statusCode 200 only indicates the BE service is OK, not that
stream load succeeded.
+ * We must parse the response JSON to check the actual status.
+ */
+ private LoadResult handleResponse(int statusCode, String body, int
rowCount) {
+ if (statusCode != 200) {
+ log.warn("[Doris StreamLoad] HTTP request returned status {},
response: {}",
+ statusCode, body);
+ return statusCode >= 500 ? LoadResult.RETRYABLE_FAILURE :
LoadResult.NON_RETRYABLE_FAILURE;
+ }
+
+ JsonNode jsonNode = JsonUtil.fromJson(body);
+ if (jsonNode == null || !jsonNode.has("Status")) {
+ log.error("[Doris StreamLoad] Invalid response body: {}", body);
+ return LoadResult.NON_RETRYABLE_FAILURE;
+ }
+
+ String status = jsonNode.get("Status").asText("");
+
+ if (STATUS_SUCCESS.equalsIgnoreCase(status)) {
+ log.info("[Doris StreamLoad] Successfully loaded {} rows",
rowCount);
+ return LoadResult.SUCCESS;
+ }
+
+ if (STATUS_PUBLISH_TIMEOUT.equalsIgnoreCase(status)) {
+ log.warn("[Doris StreamLoad] Publish Timeout for {} rows, treated
as success. Response: {}",
+ rowCount, body);
+ return LoadResult.SUCCESS;
+ }
+
+ if (STATUS_LABEL_ALREADY_EXISTS.equalsIgnoreCase(status)) {
+ String existingStatus = jsonNode.has("ExistingJobStatus")
+ ? jsonNode.get("ExistingJobStatus").asText("")
+ : "";
+ if ("FINISHED".equalsIgnoreCase(existingStatus) ||
"VISIBLE".equalsIgnoreCase(existingStatus)) {
+ log.info("[Doris StreamLoad] Label exists and already finished
for {} rows", rowCount);
+ return LoadResult.SUCCESS;
+ }
+
+ return LoadResult.RETRYABLE_FAILURE;
+ }
+
+ if (STATUS_FAIL.equalsIgnoreCase(status)) {
+ log.error("[Doris StreamLoad] Failed to load {} rows, status={},
response={}",
+ rowCount, status, body);
+ return LoadResult.NON_RETRYABLE_FAILURE;
+ }
+
+ log.warn("[Doris StreamLoad] Unexpected status={} for {} rows,
response={}",
+ status, rowCount, body);
+ return LoadResult.RETRYABLE_FAILURE;
+ }
+
+ /**
+ * Get current statistics
+ */
+ public String getStats() {
+ return String.format("transactions=%d, available=%s",
transactionId.get(), available);
+ }
+
+ /**
+ * Close the HTTP client and release resources
+ */
+ public void close() {
+ available = false;
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ log.info("[Doris StreamLoad] HTTP client closed");
+ }
+ } catch (IOException e) {
+ log.error("[Doris StreamLoad] Failed to close HTTP client", e);
+ }
+ }
+}
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
new file mode 100644
index 0000000000..a7eff2b56e
--- /dev/null
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link DorisDataStorage}.
+ */
+@ExtendWith(MockitoExtension.class)
+class DorisDataStorageTest {
+
+ private static final long NANOS_PER_MILLISECOND = 1_000_000L;
+
+ @Mock
+ private WarehouseWorkerPool workerPool;
+
+ @Test
+ void queryLogsWithPaginationShouldContainLimitAndOffsetClauses() throws
Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+ when(context.resultSet().next()).thenReturn(false);
+
+ context.storage().queryLogsByMultipleConditionsWithPagination(
+ 1000L, 2000L, null, null, null, null, null, 5, 20
+ );
+
+ ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
+
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+ String sql = sqlCaptor.getValue();
+
+ assertThat(sql).contains("ORDER BY time_unix_nano DESC");
+ assertThat(sql).contains("LIMIT ?");
+ assertThat(sql).contains("OFFSET ?");
+ }
+
+ @Test
+ void queryLogsShouldBindAllConditionsWithConvertedTimeInOrder() throws
Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+ when(context.resultSet().next()).thenReturn(false);
+
+ context.storage().queryLogsByMultipleConditions(
+ 1000L, 2000L, "trace-1", "span-1", 9, "INFO", "error"
+ );
+
+ ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
+
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+ String sql = sqlCaptor.getValue();
+
+ assertThat(sql).contains("WHERE");
+ assertThat(sql).contains("time_unix_nano >= ?");
+ assertThat(sql).contains("time_unix_nano <= ?");
+ assertThat(sql).contains("trace_id = ?");
+ assertThat(sql).contains("span_id = ?");
+ assertThat(sql).contains("severity_number = ?");
+ assertThat(sql).contains("severity_text = ?");
+ assertThat(sql).contains("body LIKE ?");
+ assertThat(sql).contains("ORDER BY time_unix_nano DESC");
+
+ verify(context.queryPreparedStatement()).setObject(1, 1000L *
NANOS_PER_MILLISECOND);
+ verify(context.queryPreparedStatement()).setObject(2, 2000L *
NANOS_PER_MILLISECOND);
+ verify(context.queryPreparedStatement()).setObject(3, "trace-1");
+ verify(context.queryPreparedStatement()).setObject(4, "span-1");
+ verify(context.queryPreparedStatement()).setObject(5, 9);
+ verify(context.queryPreparedStatement()).setObject(6, "INFO");
+ verify(context.queryPreparedStatement()).setObject(7, "%error%");
+ }
+
+ @Test
+ void queryLogsWithPaginationShouldNotAppendOffsetWhenOffsetIsZero() throws
Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+ when(context.resultSet().next()).thenReturn(false);
+
+ context.storage().queryLogsByMultipleConditionsWithPagination(
+ null, null, null, null, null, null, null, 0, 20
+ );
+
+ ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
+
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+ String sql = sqlCaptor.getValue();
+
+ assertThat(sql).contains("LIMIT ?");
+ assertThat(sql).doesNotContain("OFFSET ?");
+ verify(context.queryPreparedStatement()).setObject(1, 20);
+ }
+
+ @Test
+ void countLogsShouldReturnCountAndBindConditions() throws Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+ when(context.resultSet().next()).thenReturn(true);
+ when(context.resultSet().getLong("count")).thenReturn(5L);
+
+ long count = context.storage().countLogsByMultipleConditions(
+ 1000L, null, "trace-1", null, null, null, null
+ );
+
+ assertThat(count).isEqualTo(5L);
+
+ ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
+
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+ String sql = sqlCaptor.getValue();
+
+ assertThat(sql).contains("SELECT COUNT(*) AS count");
+ assertThat(sql).contains("time_unix_nano >= ?");
+ assertThat(sql).contains("trace_id = ?");
+ verify(context.queryPreparedStatement()).setObject(1, 1000L *
NANOS_PER_MILLISECOND);
+ verify(context.queryPreparedStatement()).setObject(2, "trace-1");
+ }
+
+ @Test
+ void batchDeleteLogsShouldFilterNullValuesAndExecuteDelete() throws
Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+ when(context.queryPreparedStatement().executeUpdate()).thenReturn(2);
+
+ List<Long> timeUnixNanos = new ArrayList<>();
+ timeUnixNanos.add(111L);
+ timeUnixNanos.add(null);
+ timeUnixNanos.add(333L);
+
+ boolean deleted = context.storage().batchDeleteLogs(timeUnixNanos);
+
+ assertThat(deleted).isTrue();
+
+ ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
+
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+ assertThat(sqlCaptor.getValue()).contains("time_unix_nano IN (?,?)");
+ verify(context.queryPreparedStatement()).setLong(1, 111L);
+ verify(context.queryPreparedStatement()).setLong(2, 333L);
+ }
+
+ @Test
+ void batchDeleteLogsShouldReturnFalseWhenAllValuesAreNull() throws
Exception {
+ DorisDataStorage storage =
createStorageContext(createProperties(false));
+
+ List<Long> timeUnixNanos = new ArrayList<>();
+ timeUnixNanos.add(null);
+ timeUnixNanos.add(null);
+
+ boolean deleted = storage.batchDeleteLogs(timeUnixNanos);
+
+ assertThat(deleted).isFalse();
+ }
+
+ @Test
+ void queryLogsShouldMapJsonColumnsToLogEntryFields() throws Exception {
+ QueryStorageContext context =
createQueryStorageContext(createProperties(false));
+
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+ ResultSet resultSet = context.resultSet();
+
+ when(resultSet.next()).thenReturn(true, false);
+ when(resultSet.getLong("time_unix_nano")).thenReturn(111L);
+ when(resultSet.getLong("observed_time_unix_nano")).thenReturn(222L);
+ when(resultSet.getInt("severity_number")).thenReturn(9);
+ when(resultSet.getString("severity_text")).thenReturn("INFO");
+ when(resultSet.getString("body")).thenReturn("{\"message\":\"ok\"}");
+ when(resultSet.getString("trace_id")).thenReturn("trace-1");
+ when(resultSet.getString("span_id")).thenReturn("span-1");
+ when(resultSet.getInt("trace_flags")).thenReturn(1);
+ when(resultSet.getString("attributes")).thenReturn("{\"k\":\"v\"}");
+
when(resultSet.getString("resource")).thenReturn("{\"service\":\"warehouse\"}");
+
when(resultSet.getString("instrumentation_scope")).thenReturn("{\"name\":\"scope\",\"version\":\"1.0.0\"}");
+ when(resultSet.getInt("dropped_attributes_count")).thenReturn(3);
+ when(resultSet.wasNull()).thenReturn(false, false, false);
+
+ List<LogEntry> entries =
context.storage().queryLogsByMultipleConditions(
+ null, null, null, null, null, null, null
+ );
+
+ assertThat(entries).hasSize(1);
+ LogEntry entry = entries.get(0);
+ assertThat(entry.getTimeUnixNano()).isEqualTo(111L);
+ assertThat(entry.getObservedTimeUnixNano()).isEqualTo(222L);
+ assertThat(entry.getSeverityNumber()).isEqualTo(9);
+ assertThat(entry.getSeverityText()).isEqualTo("INFO");
+ assertThat(entry.getTraceId()).isEqualTo("trace-1");
+ assertThat(entry.getSpanId()).isEqualTo("span-1");
+ assertThat(entry.getBody()).isInstanceOf(Map.class);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> body = (Map<String, Object>) entry.getBody();
+ assertThat(body).containsEntry("message", "ok");
+ assertThat(entry.getAttributes()).containsEntry("k", "v");
+ assertThat(entry.getResource()).containsEntry("service", "warehouse");
+ assertThat(entry.getInstrumentationScope()).isNotNull();
+
assertThat(entry.getInstrumentationScope().getName()).isEqualTo("scope");
+
assertThat(entry.getInstrumentationScope().getVersion()).isEqualTo("1.0.0");
+ }
+
+ private QueryStorageContext createQueryStorageContext(DorisProperties
properties) {
+ Connection initConnection = mock(Connection.class);
+ Statement initStatement = mock(Statement.class);
+ Connection tableConnection = mock(Connection.class);
+ Statement tableStatement = mock(Statement.class);
+ Connection queryConnection = mock(Connection.class);
+ PreparedStatement queryPreparedStatement =
mock(PreparedStatement.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ AtomicInteger dataSourceConnectionCalls = new AtomicInteger(0);
+ String baseUrl = properties.url();
+ String username = properties.username();
+ String password = properties.password();
+ try {
+ when(initConnection.createStatement()).thenReturn(initStatement);
+ when(tableConnection.createStatement()).thenReturn(tableStatement);
+
when(queryConnection.prepareStatement(anyString())).thenReturn(queryPreparedStatement);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try (MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class);
+ MockedConstruction<HikariDataSource>
hikariDataSourceMockedConstruction = mockConstruction(
+ HikariDataSource.class,
+ (mock, context) ->
when(mock.getConnection()).thenAnswer(invocation -> {
+ int call =
dataSourceConnectionCalls.getAndIncrement();
+ return call == 0 ? tableConnection : queryConnection;
+ }))) {
+ driverManagerMock.when(() -> DriverManager.getConnection(baseUrl,
username, password))
+ .thenReturn(initConnection);
+ DorisDataStorage storage = new DorisDataStorage(properties,
workerPool);
+ return new QueryStorageContext(storage, queryConnection,
queryPreparedStatement, resultSet);
+ }
+ }
+
+ private DorisDataStorage createStorageContext(DorisProperties properties) {
+ Connection initConnection = mock(Connection.class);
+ Statement initStatement = mock(Statement.class);
+ Connection tableConnection = mock(Connection.class);
+ Statement tableStatement = mock(Statement.class);
+ String baseUrl = properties.url();
+ String username = properties.username();
+ String password = properties.password();
+ try {
+ when(initConnection.createStatement()).thenReturn(initStatement);
+ when(tableConnection.createStatement()).thenReturn(tableStatement);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try (MockedStatic<DriverManager> driverManagerMock =
mockStatic(DriverManager.class);
+ MockedConstruction<HikariDataSource>
hikariDataSourceMockedConstruction = mockConstruction(
+ HikariDataSource.class, (mock, context) ->
when(mock.getConnection()).thenReturn(tableConnection))) {
+ driverManagerMock.when(() -> DriverManager.getConnection(baseUrl,
username, password))
+ .thenReturn(initConnection);
+ return new DorisDataStorage(properties, workerPool);
+ }
+ }
+
+ private DorisProperties createProperties(boolean enablePartition) {
+ DorisProperties.TableConfig tableConfig = new
DorisProperties.TableConfig(
+ enablePartition, "HOUR", 2, 1, 12, 1, 4096
+ );
+ DorisProperties.PoolConfig poolConfig = new DorisProperties.PoolConfig(
+ 1, 2, 500, 0, 60_000
+ );
+ DorisProperties.WriteConfig writeConfig = new
DorisProperties.WriteConfig(
+ "jdbc", 1000, 5, false,
DorisProperties.StreamLoadConfig.createDefault()
+ );
+ return new DorisProperties(
+ true,
+ "jdbc:mysql://127.0.0.1:9030/hertzbeat",
+ "root",
+ "123456",
+ tableConfig,
+ poolConfig,
+ writeConfig
+ );
+ }
+
+ private record QueryStorageContext(DorisDataStorage storage, Connection
queryConnection,
+ PreparedStatement
queryPreparedStatement, ResultSet resultSet) {
+ }
+}
diff --git a/home/docs/start/doris-init.md b/home/docs/start/doris-init.md
new file mode 100644
index 0000000000..78b47732d8
--- /dev/null
+++ b/home/docs/start/doris-init.md
@@ -0,0 +1,222 @@
+---
+id: doris-init
+title: Use Apache Doris to Store Metrics and Logs Data (Optional)
+sidebar_label: Metrics/Logs Store Doris
+---
+
+Apache HertzBeat's historical data storage relies on the time series database,
you can choose one of them to install and initialize, or not to install (note
⚠️ but it is strongly recommended to configure in the production environment)
+
+> It is recommended to use Greptime as metrics storage.
+
+Apache Doris is an MPP-based real-time analytics database. In HertzBeat, Doris
can be used to store:
+
+- Metrics history data (`hzb_history`)
+- Log data (`hzb_log`)
+
+**⚠️ If you do not configure a time-series database, only the last hour of
historical data is retained.**
+
+> If you already have a Doris cluster, skip directly to the YML configuration
section.
+
+### Install Doris (Optional)
+
+You can deploy Doris by package or Docker. For production, follow the official
deployment guide:
+
+- Doris docs: [Quick
Start](https://doris.apache.org/docs/4.x/gettingStarted/quick-start/)
+
+For HertzBeat integration, ensure at least:
+
+- FE MySQL service port is reachable (default `9030`)
+- FE HTTP service port is reachable (default `8030`)
+
+### Note: Install MySQL JDBC Driver Jar
+
+- Download MySQL JDBC driver jar, for example mysql-connector-java-8.1.0.jar.
[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0](https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0)
+- Copy this jar to the `ext-lib` directory in HertzBeat installation directory.
+- Restart HertzBeat service.
+
+### Prerequisites
+
+1. Doris FE and BE are running normally.
+2. HertzBeat can access:
+ - FE MySQL port (default `9030`) for metadata/query
+ - FE HTTP port (default `8030`) for Stream Load
+3. The configured Doris user has permission to create database/table and
insert/query data.
+
+### Configure Doris in HertzBeat `application.yml`
+
+1. Edit `hertzbeat/config/application.yml`.
+
+ For Docker deployment, mount the config file from host.
+ For package deployment, modify `hertzbeat/config/application.yml` directly.
+
+2. Configure `warehouse.store.doris` (Production Environment Recommended using
Stream Load Mode):
+
+```yaml
+warehouse:
+ store:
+ doris:
+ enabled: true
+ # FE MySQL endpoint
+ url: jdbc:mysql://127.0.0.1:9030
+ username: root
+ password:
+
+ table-config:
+ # Enable dynamic partition for automatic expiration
+ enable-partition: true
+ # HOUR / DAY / MONTH
+ partition-time-unit: DAY
+ # Number of history partitions to keep
+ partition-retention-days: 30
+ # Number of future partitions to pre-create
+ partition-future-days: 3
+ buckets: 8
+ replication-num: 3
+
+ pool-config:
+ minimum-idle: 5
+ maximum-pool-size: 20
+ connection-timeout: 30000
+
+ write-config:
+ # Strongly recommend stream mode in production for high throughput
+ write-mode: stream
+ batch-size: 1000
+ flush-interval: 5
+ stream-load-config:
+ # FE HTTP port
+ http-port: ":8030"
+ timeout: 60
+ max-bytes-per-batch: 10485760
+ # For complex networks (K8s/cross-domain): direct / public / private
+ redirect-policy: ""
+```
+
+### Switching to Stream Load Mode
+
+#### Production Environment Configuration
+
+For production deployments, **strongly recommend using Stream Load mode** to
ensure high-performance large-scale writes. Stream Load writes directly to
Doris storage layer, providing better throughput improvement compared to JDBC
mode.
+
+#### Pre-Switch Checklist
+
+1. **Network Reachability**
+ - Ensure HertzBeat can access Doris FE HTTP port (default `8030`)
+ - If direct connection is not possible, configure BE endpoint labels in
Doris
+
+2. **Special Configuration for Complex Network Scenarios**
+
+ In K8s, cross-domain, or load-balanced environments, Stream Load's redirect
mechanism requires special attention:
+ - FE redirects requests to an available BE, which must be reachable from
HertzBeat
+ - Control returned BE address type via `redirect-policy`:
+ - `direct`: Direct BE IP connection
+ - `public`: Use public IP (cloud environments)
+ - `private`: Use private IP (private networks)
+ - Leave empty to use Doris default policy
+
+ Reference: [Doris Stream Load in Complex
Networks](https://doris.apache.org/zh-CN/docs/4.x/data-operate/import/load-internals/stream-load-in-complex-network)
+
+#### Switching Steps
+
+1. **Modify Configuration File**
+
+ Edit `hertzbeat/config/application.yml` and change `write-mode` to `stream`:
+ ```yaml
+ warehouse:
+ store:
+ doris:
+ write-config:
+ write-mode: stream # Change here: from jdbc to stream
+ stream-load-config:
+ http-port: ":8030"
+ timeout: 60
+ max-bytes-per-batch: 10485760
+ redirect-policy: "" # Configure if complex network
+ ```
+
+2. **Restart HertzBeat Service**
+
+3. **Verify Successful Switch**
+
+Check HertzBeat logs for Stream Load messages
+
+#### Common Switching Questions
+
+**Q: Do I need to rebuild tables after switching?**
+
+A: No. Stream Load and JDBC modes use the same table structure, fully
compatible.
+
+**Q: Will data be lost when switching from JDBC to Stream Load?**
+
+A: No. Both write modes are independent, historical data remains unchanged.
+
+**Q: How do I rollback if Stream Load fails?**
+
+A: If the stream processing fails, it will automatically try to use the jdbc
mode for fallback writing
+
+**Q: Still getting timeouts in cross-network setup with redirect-policy
configured?**
+
+A: Possible causes:
+- Returned BE address under current `redirect-policy` setting is unreachable
+- Try different `redirect-policy` values (`direct` / `public` / `private`)
+- Contact Doris admin to verify BE endpoint label configuration
+
+### Parameter Notes
+
+| Parameter | Description |
+| --- | --- |
+| `enabled` | Enable/disable Doris storage |
+| `url` | Doris FE MySQL JDBC endpoint |
+| `table-config.enable-partition` | Enable dynamic partition and automatic
expiration |
+| `table-config.partition-time-unit` | Partition granularity: `HOUR` / `DAY` /
`MONTH` |
+| `table-config.partition-retention-days` | Number of partitions retained |
+| `table-config.partition-future-days` | Number of future partitions
pre-created |
+| `table-config.buckets` | Bucket count for table distribution |
+| `table-config.replication-num` | Replica count |
+| `write-config.write-mode` | `jdbc` or `stream` |
+| `write-config.batch-size` | Write batch size |
+| `write-config.flush-interval` | Flush interval in seconds |
+| `stream-load-config.http-port` | Doris FE HTTP port for Stream Load |
+| `stream-load-config.timeout` | Stream Load timeout in seconds |
+| `stream-load-config.max-bytes-per-batch` | Max bytes per stream-load batch |
+| `stream-load-config.redirect-policy` | Redirect policy for FE->BE endpoint
selection: `direct` / `public` / `private` |
+
+### Restart HertzBeat
+
+After configuration changes, restart HertzBeat to apply Doris storage settings.
+
+### Verify Doris Storage Is Working
+
+1. Check HertzBeat logs for Stream Load success messages.
+2. Verify database and tables:
+
+```sql
+SHOW CREATE TABLE hertzbeat.hzb_history;
+SHOW CREATE TABLE hertzbeat.hzb_log;
+```
+
+3. If partition is enabled, check dynamic partition state:
+
+```sql
+SHOW DYNAMIC PARTITION TABLES FROM hertzbeat;
+SHOW PARTITIONS FROM hertzbeat.hzb_history;
+SHOW PARTITIONS FROM hertzbeat.hzb_log;
+```
+
+### FAQ
+
+1. Do I need to enable partition to use bucket distribution?
+
+ > No. Buckets work with or without dynamic partition. `enable-partition`
only controls dynamic partition and automatic expiration.
+
+2. Can I use Doris for both metrics and logs at the same time?
+
+ > Yes. HertzBeat writes metrics into `hzb_history` and logs into `hzb_log`
with the same Doris datasource configuration.
+
+3. If I change partition/bucket settings in `application.yml`, will existing
tables auto-update?
+
+ > No. Existing Doris table DDL is not automatically altered. For
schema-level changes, apply DDL manually or recreate tables.
+
+4. Is stream load compression enabled?
+
+ > Current implementation uses JSON stream load by default.
diff --git
a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md
new file mode 100644
index 0000000000..9c2c072ef9
--- /dev/null
+++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md
@@ -0,0 +1,221 @@
+---
+id: doris-init
+title: 依赖时序数据库服务 Doris 安装初始化(可选)
+sidebar_label: 指标/日志数据存储 Doris
+---
+
+Apache HertzBeat™ 的历史数据存储依赖时序数据库,任选其一安装初始化即可,也可不安装(注意⚠️但强烈建议生产环境配置)。
+
+> 我们推荐使用并长期支持 Greptime 作为存储。
+
+Apache Doris 是一款面向实时分析场景的 MPP 数据库。在 HertzBeat 中,Doris 可用于同时存储:
+
+- 指标历史数据(`hzb_history`)
+- 日志数据(`hzb_log`)
+
+**⚠️ 若不配置时序数据库,则只会保留最近一小时历史数据。**
+
+> 如果您已有 Doris 环境,可直接跳到 YML 配置步骤。
+
+### 安装 Doris(可选)
+
+你可以通过安装包或 Docker 部署 Doris。生产环境建议参考官方部署文档:
+
+- Doris 官方文档:[Quick
Start](https://doris.apache.org/docs/4.x/gettingStarted/quick-start/)
+
+对 HertzBeat 接入来说,至少需要保证:
+
+- FE MySQL 服务端口可访问(默认 `9030`)
+- FE HTTP 服务端口可访问(默认 `8030`)
+
+### 注意,必须添加 MYSQL jdbc 驱动 jar
+
+- 下载 MYSQL jdbc driver jar,例如
mysql-connector-java-8.1.0.jar。[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0](https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0)
+- 将此 jar 包拷贝放入 HertzBeat 的安装目录下的 `ext-lib` 目录下。
+- 重启 HertzBeat 服务。
+
+### 前置检查
+
+1. Doris FE、BE 节点状态正常。
+2. HertzBeat 到 Doris 网络可达:
+ - FE MySQL 端口(默认 `9030`),用于建库建表、查询
+ - FE HTTP 端口(默认 `8030`),用于 Stream Load 写入
+3. 配置的 Doris 用户具有建库建表、写入、查询权限。
+
+### 在 HertzBeat `application.yml` 中配置 Doris
+
+1. 修改 `hertzbeat/config/application.yml`。
+
+ Docker 容器部署需将配置文件挂载到主机本地,安装包部署直接修改解压目录下配置文件即可。
+
+2. 配置 `warehouse.store.doris`(生产环境推荐配置使用 Stream Load 模式):
+
+```yaml
+warehouse:
+ store:
+ doris:
+ enabled: true
+ # Doris FE MySQL 连接地址
+ url: jdbc:mysql://127.0.0.1:9030
+ username: root
+ password:
+
+ table-config:
+ # 开启动态分区(用于自动过期)
+ enable-partition: true
+ # 支持 HOUR / DAY / MONTH
+ partition-time-unit: DAY
+ # 历史分区保留数量
+ partition-retention-days: 30
+ # 预创建未来分区数量
+ partition-future-days: 3
+ buckets: 8
+ replication-num: 3
+
+ pool-config:
+ minimum-idle: 5
+ maximum-pool-size: 20
+ connection-timeout: 30000
+
+ write-config:
+ # 生产环境强烈推荐使用 stream 模式以获得高吞吐性能
+ write-mode: stream
+ batch-size: 1000
+ flush-interval: 5
+ stream-load-config:
+ # Doris FE HTTP 端口
+ http-port: ":8030"
+ timeout: 60
+ max-bytes-per-batch: 10485760
+ # 复杂网络环境(K8s/跨域)需配置:direct / public / private
+ redirect-policy: ""
+```
+
+### 切换到 Stream Load 模式
+
+#### 生产环境推荐配置
+
+在生产环境部署时,**强烈推荐使用 Stream Load 模式** 以保证大规模写入性能。Stream Load 直接写入 Doris 存储层,相比
JDBC 模式提供更高的吞吐量提升。
+
+#### 切换前的前置检查
+
+1. **网络可达性**
+ - 确保 HertzBeat 能访问 Doris FE HTTP 端口(默认 `8030`)
+ - 若无法直连,需在 Doris 侧配置 BE 公网/内网地址标签
+
+2. **复杂网络场景的特殊配置**
+
+ 在 K8s、跨域、负载均衡等环境下,Stream Load 的重定向机制需要特别注意:
+ - FE 会将请求重定向到某个可用的 BE,该 BE 地址必须对 HertzBeat 可达
+ - 通过配置 `redirect-policy` 来控制 FE 返回的 BE 地址类型:
+ - `direct`:直连 BE IP
+ - `public`:使用公网地址(云环境)
+ - `private`:使用内网地址(私有网络)
+ - 留空则使用 Doris 默认策略
+
+ 参考官方文档:[Doris Stream Load
复杂网络原理](https://doris.apache.org/zh-CN/docs/4.x/data-operate/import/load-internals/stream-load-in-complex-network)
+
+#### 切换步骤
+
+1. **修改配置文件**
+
+ 编辑 `hertzbeat/config/application.yml`,将 `write-mode` 改为 `stream`:
+ ```yaml
+ warehouse:
+ store:
+ doris:
+ write-config:
+ write-mode: stream # 改这里:从 jdbc 改为 stream
+ stream-load-config:
+ http-port: ":8030"
+ timeout: 60
+ max-bytes-per-batch: 10485760
+ redirect-policy: "" # 复杂网络需配置这里
+ ```
+
+2. **重启 HertzBeat 服务**
+
+3. **验证切换成功**
+
+查看 HertzBeat 日志,应出现 Stream Load 相关日志
+
+#### 常见切换问题
+
+**Q: 切换后需要重建表吗?**
+
+A: 不需要。Stream Load 和 JDBC 模式使用同一套表结构,数据完全兼容。
+
+**Q: 从 JDBC 切换到 Stream Load 会丢数据吗?**
+
+A: 不会。两种模式的写入是独立的,历史数据保持不变。
+
+**Q: Stream Load 失败了怎么回退?**
+
+A: 如果流处理失败了会自动尝试使用jdbc模式进行回退写入
+
+**Q: 跨网络环境配置了 redirect-policy 仍然超时?**
+
+A: 可能原因:
+- 在当前 `redirect-policy` 设置下,返回的 BE 地址仍不可达
+- 尝试其他 `redirect-policy` 值(`direct` / `public` / `private`)
+- 联系 Doris 管理员确认 BE 节点的地址标签配置是否正确
+
+### 参数说明
+
+| 参数 | 说明 |
+| --- | --- |
+| `enabled` | 是否启用 Doris 存储 |
+| `url` | Doris FE MySQL JDBC 地址 |
+| `table-config.enable-partition` | 是否启用动态分区与自动过期 |
+| `table-config.partition-time-unit` | 分区时间粒度:`HOUR` / `DAY` / `MONTH` |
+| `table-config.partition-retention-days` | 历史分区保留数量 |
+| `table-config.partition-future-days` | 未来分区预创建数量 |
+| `table-config.buckets` | 分桶数量 |
+| `table-config.replication-num` | 副本数量 |
+| `write-config.write-mode` | 写入模式:`jdbc` 或 `stream` |
+| `write-config.batch-size` | 单批写入大小 |
+| `write-config.flush-interval` | 刷新间隔(秒) |
+| `stream-load-config.http-port` | Stream Load 使用的 FE HTTP 端口 |
+| `stream-load-config.timeout` | Stream Load 超时时间(秒) |
+| `stream-load-config.max-bytes-per-batch` | 单批最大字节数 |
+| `stream-load-config.redirect-policy` | FE->BE 地址返回策略:`direct` / `public` /
`private` |
+
+### 重启 HertzBeat
+
+完成配置后,重启 HertzBeat 使配置生效。
+
+### 验证 Doris 存储是否生效
+
+1. 查看 HertzBeat 日志,确认出现 Stream Load 成功日志。
+2. 在 Doris 中检查建表是否完成:
+
+```sql
+SHOW CREATE TABLE hertzbeat.hzb_history;
+SHOW CREATE TABLE hertzbeat.hzb_log;
+```
+
+3. 若启用了动态分区,检查分区调度状态:
+
+```sql
+SHOW DYNAMIC PARTITION TABLES FROM hertzbeat;
+SHOW PARTITIONS FROM hertzbeat.hzb_history;
+SHOW PARTITIONS FROM hertzbeat.hzb_log;
+```
+
+### 常见问题
+
+1. 不开启分区还能分桶吗?
+
+ > 可以。分桶与是否开启动态分区无强依赖,`enable-partition` 主要影响动态分区和自动过期能力。
+
+2. Doris 能否同时存储指标和日志?
+
+ > 可以。HertzBeat 会将指标写入 `hzb_history`,日志写入 `hzb_log`,共用同一 Doris 数据源配置。
+
+3. 修改 `application.yml` 的分区/分桶参数后,旧表会自动更新吗?
+
+ > 不会。已存在表的 DDL 不会自动变更。需要手动执行 DDL 或重建表。
+
+4. 当前是否启用了 Stream Load 压缩?
+
+ > 当前实现默认使用 JSON Stream Load。
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]