This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new b3dba8b3cf [feat] Support Apache Doris as time-series storage for both 
metrics and logs (#4031)
b3dba8b3cf is described below

commit b3dba8b3cf814740688caf9f8f03e069389c541a
Author: LunaRain_079 <[email protected]>
AuthorDate: Mon Mar 2 23:02:51 2026 +0800

    [feat] Support Apache Doris as time-series storage for both metrics and 
logs (#4031)
---
 .../src/main/resources/application.yml             |   32 +
 hertzbeat-warehouse/pom.xml                        |   13 +
 .../warehouse/constants/WarehouseConstants.java    |    2 +
 .../store/history/tsdb/doris/DorisDataStorage.java | 1244 ++++++++++++++++++++
 .../store/history/tsdb/doris/DorisMetricRow.java   |   46 +
 .../store/history/tsdb/doris/DorisProperties.java  |  235 ++++
 .../history/tsdb/doris/DorisStreamLoadWriter.java  |  603 ++++++++++
 .../history/tsdb/doris/DorisDataStorageTest.java   |  314 +++++
 home/docs/start/doris-init.md                      |  222 ++++
 .../current/start/doris-init.md                    |  221 ++++
 10 files changed, 2932 insertions(+)

diff --git a/hertzbeat-startup/src/main/resources/application.yml 
b/hertzbeat-startup/src/main/resources/application.yml
index 85ff831d59..17a70b2c08 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -225,6 +225,38 @@ warehouse:
       password: root
       expire-time: '30d'
       replication: 1
+    doris:
+      enabled: false
+      url: jdbc:mysql://127.0.0.1:9030
+      username: root
+      password:
+      table-config:
+        enable-partition: false
+        partition-time-unit: DAY
+        partition-retention-days: 30
+        partition-future-days: 3
+        buckets: 8
+        replication-num: 1
+      pool-config:
+        minimum-idle: 5
+        maximum-pool-size: 20
+        connection-timeout: 30000
+      write-config:
+        # Write mode: jdbc (default, suitable for small/medium scale) or 
stream (high throughput)
+        write-mode: jdbc
+        # JDBC mode: batch size and flush interval
+        batch-size: 1000
+        flush-interval: 5
+        # Stream Load mode configuration (only used when write-mode: stream)
+        stream-load-config:
+          # Doris FE HTTP port for Stream Load API
+          http-port: :8030
+          # Stream load timeout in seconds
+          timeout: 60
+          # Max batch size in bytes for stream load (10MB default)
+          max-bytes-per-batch: 10485760
+          # Redirect policy in complex networks: direct/public/private, empty 
means Doris default
+          redirect-policy: ""
   # store real-time metrics data, enable only one below
   real-time:
     memory:
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index b2d5e2ac53..e65b30d536 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -119,6 +119,19 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- doris -->
+        <dependency>
+          <groupId>com.mysql</groupId>
+          <artifactId>mysql-connector-j</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>com.zaxxer</groupId>
+          <artifactId>HikariCP</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+        </dependency>
         <!-- kafka -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
index 45ad3f038f..db73381be6 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/constants/WarehouseConstants.java
@@ -48,6 +48,8 @@ public interface WarehouseConstants {
         String QUEST_DB = "questdb";
 
         String DUCKDB = "duckdb";
+
+        String DORIS = "doris";
     }
 
     /**
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
new file mode 100644
index 0000000000..22bcd46827
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
+import 
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Apache Doris data storage.
+ * Supports two write modes:
+ * - jdbc: Traditional JDBC batch insert (default, suitable for small to 
medium scale)
+ * - stream: HTTP Stream Load API (high throughput, suitable for large scale)
+ */
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled", 
havingValue = "true")
+@Slf4j
+public class DorisDataStorage extends AbstractHistoryDataStorage {
+
+    private static final String DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
+    private static final String DATABASE_NAME = "hertzbeat";
+    private static final String TABLE_NAME = "hzb_history";
+    private static final String LOG_TABLE_NAME = "hzb_log";
+    private static final String WRITE_MODE_JDBC = "jdbc";
+    private static final String WRITE_MODE_STREAM = "stream";
+
+    /**
+     * Metric type constants
+     */
+    private static final byte METRIC_TYPE_NUMBER = 1;
+    private static final byte METRIC_TYPE_STRING = 2;
+    private static final byte METRIC_TYPE_TIME = 3;
+
+    private static final String LABEL_KEY_START_TIME = "start";
+    private static final String LABEL_KEY_END_TIME = "end";
+    private static final int MAX_QUERY_LIMIT = 20000;
+    private static final int LOG_BATCH_SIZE = 1000;
+    private static final long NANOS_PER_MILLISECOND = 1_000_000L;
+    private static final long MAX_WAIT_MS = 500L;
+    private static final int MAX_RETRIES = 3;
+    private static final String METRIC_BLOOM_FILTER_COLUMNS = 
"instance,app,metrics,metric";
+    private static final String LOG_BLOOM_FILTER_COLUMNS = 
"trace_id,span_id,severity_number,severity_text";
+    private static final String CREATE_METRIC_TABLE_HEADER_TEMPLATE = """
+            CREATE TABLE IF NOT EXISTS %s (
+                instance                VARCHAR(128)   COMMENT 'Monitor 
instance address',
+                app                     VARCHAR(64)    COMMENT 'Monitor 
application type',
+                metrics                 VARCHAR(128)   COMMENT 'Metrics set 
name',
+                metric                  VARCHAR(128)   COMMENT 'Metric name',
+                record_time             DATETIME       COMMENT 'Record time',
+                metric_type             TINYINT        COMMENT 'Metric type: 
1-number, 2-string, 3-time',
+                int32_value             INT            COMMENT 'Integer value',
+                double_value            DOUBLE         COMMENT 'Double value',
+                str_value               VARCHAR(65533) COMMENT 'String value',
+                labels                  VARCHAR(%d)  COMMENT 'Labels JSON'
+            ) DUPLICATE KEY(instance, app, metrics, metric, record_time)
+            """;
+    private static final String CREATE_LOG_TABLE_HEADER_TEMPLATE = """
+            CREATE TABLE IF NOT EXISTS %s (
+                time_unix_nano          BIGINT         COMMENT 'event unix 
time in nanoseconds',
+                trace_id                VARCHAR(64)    COMMENT 'trace id',
+                span_id                 VARCHAR(32)    COMMENT 'span id',
+                event_time              DATETIME       COMMENT 'event time for 
partition and query',
+                observed_time_unix_nano BIGINT         COMMENT 'observed unix 
time in nanoseconds',
+                severity_number         INT            COMMENT 'severity 
number',
+                severity_text           VARCHAR(32)    COMMENT 'severity text',
+                body                    VARCHAR(65533) COMMENT 'log body json',
+                trace_flags             INT            COMMENT 'trace flags',
+                attributes              VARCHAR(65533) COMMENT 'log attributes 
json',
+                resource                VARCHAR(65533) COMMENT 'resource json',
+                instrumentation_scope   VARCHAR(%d) COMMENT 'instrumentation 
scope json',
+                dropped_attributes_count INT           COMMENT 'dropped 
attributes count'
+            ) DUPLICATE KEY(time_unix_nano, trace_id, span_id, event_time)
+            """;
+    private static final String CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE = """
+            PARTITION BY RANGE(%s) ()
+            DISTRIBUTED BY HASH(%s) BUCKETS %d
+            PROPERTIES (
+                "replication_num" = "%d",
+                "bloom_filter_columns" = "%s",
+                "dynamic_partition.enable" = "true",
+                "dynamic_partition.time_unit" = "%s",
+                "dynamic_partition.end" = "%d",
+                "dynamic_partition.prefix" = "p",
+                "dynamic_partition.buckets" = "%d",
+                "dynamic_partition.history_partition_num" = "%d"
+            )
+            """;
+    private static final String CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE = 
"""
+            DISTRIBUTED BY HASH(%s) BUCKETS %d
+            PROPERTIES (
+                "replication_num" = "%d",
+                "bloom_filter_columns" = "%s"
+            )
+            """;
+
+    private final DorisProperties properties;
+    private HikariDataSource dataSource;
+
+    private final BlockingQueue<DorisMetricRow> metricsBufferQueue;
+    private final DorisProperties.WriteConfig writeConfig;
+    private final WarehouseWorkerPool warehouseWorkerPool;
+    private String writeMode;
+    private final AtomicBoolean draining = new AtomicBoolean(false);
+    private volatile boolean flushThreadRunning = true;
+    private volatile boolean flushTaskStarted;
+    private final CountDownLatch flushTaskStopped = new CountDownLatch(1);
+
+    // Stream Load writer (only used when writeMode is "stream")
+    private DorisStreamLoadWriter streamLoadWriter;
+    private DorisStreamLoadWriter logStreamLoadWriter;
+
+    public DorisDataStorage(DorisProperties dorisProperties, 
WarehouseWorkerPool warehouseWorkerPool) {
+        if (dorisProperties == null) {
+            log.error("[Doris] Init error, please config Warehouse Doris props 
in application.yml");
+            throw new IllegalArgumentException("please config Warehouse Doris 
props");
+        }
+        if (warehouseWorkerPool == null) {
+            throw new IllegalArgumentException("please config 
WarehouseWorkerPool bean");
+        }
+        this.properties = dorisProperties;
+        this.warehouseWorkerPool = warehouseWorkerPool;
+        this.writeConfig = dorisProperties.writeConfig();
+        this.writeMode = writeConfig.writeMode();
+        this.metricsBufferQueue = new 
LinkedBlockingQueue<>(writeConfig.batchSize() * 10);
+
+        serverAvailable = initDorisConnection();
+        if (serverAvailable) {
+            // Initialize Stream Load writer if using stream mode
+            if (WRITE_MODE_STREAM.equals(writeMode)) {
+                initStreamLoadWriter();
+            }
+            startFlushThread();
+        }
+
+        log.info("[Doris] Initialized with write mode: {}", writeMode);
+    }
+
+    /**
+     * Initialize Doris connection and table in one go.
+     */
+    private boolean initDorisConnection() {
+        try {
+            Class.forName(DRIVER_NAME);
+
+            DorisProperties.PoolConfig poolConfig = properties.poolConfig();
+            String baseUrl = properties.url();
+
+            // Step 1: First connect without database to create it if needed
+            try (Connection initConn = java.sql.DriverManager.getConnection(
+                    baseUrl, properties.username(), properties.password());
+                 Statement stmt = initConn.createStatement()) {
+                // Create database if not exists
+                stmt.execute("CREATE DATABASE IF NOT EXISTS " + DATABASE_NAME);
+                log.info("[Doris] Database {} ensured", DATABASE_NAME);
+            }
+
+            // Step 2: Build URL with database
+            String urlWithDb = baseUrl.endsWith("/") ? baseUrl + DATABASE_NAME 
: baseUrl + "/" + DATABASE_NAME;
+
+            // Step 3: Create connection pool with database
+            HikariConfig config = getHikariConfig(urlWithDb, poolConfig);
+
+            this.dataSource = new HikariDataSource(config);
+
+            // Step 4: Create table
+            try (Connection conn = dataSource.getConnection();
+                 Statement stmt = conn.createStatement()) {
+                DorisProperties.TableConfig tableConfig = 
properties.tableConfig();
+
+                String metricTableHeader = CREATE_METRIC_TABLE_HEADER_TEMPLATE
+                    .formatted(TABLE_NAME, tableConfig.strColumnMaxLength());
+                String metricTableSuffix = tableConfig.enablePartition()
+                    ? CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE.formatted(
+                    "record_time",
+                    "instance, app, metrics",
+                    tableConfig.buckets(),
+                    tableConfig.replicationNum(),
+                    METRIC_BLOOM_FILTER_COLUMNS,
+                    tableConfig.partitionTimeUnit(),
+                    tableConfig.partitionFutureDays(),
+                    tableConfig.buckets(),
+                    tableConfig.partitionRetentionDays())
+                    : CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE.formatted(
+                    "instance, app, metrics",
+                    tableConfig.buckets(),
+                    tableConfig.replicationNum(),
+                    METRIC_BLOOM_FILTER_COLUMNS);
+
+                String logTableHeader = CREATE_LOG_TABLE_HEADER_TEMPLATE
+                    .formatted(LOG_TABLE_NAME, 
tableConfig.strColumnMaxLength());
+                String logTableSuffix = tableConfig.enablePartition()
+                    ? CREATE_TABLE_PARTITION_SUFFIX_TEMPLATE.formatted(
+                    "event_time",
+                    "time_unix_nano",
+                    tableConfig.buckets(),
+                    tableConfig.replicationNum(),
+                    LOG_BLOOM_FILTER_COLUMNS,
+                    tableConfig.partitionTimeUnit(),
+                    tableConfig.partitionFutureDays(),
+                    tableConfig.buckets(),
+                    tableConfig.partitionRetentionDays())
+                    : CREATE_TABLE_NON_PARTITION_SUFFIX_TEMPLATE.formatted(
+                    "time_unix_nano",
+                    tableConfig.buckets(),
+                    tableConfig.replicationNum(),
+                    LOG_BLOOM_FILTER_COLUMNS);
+
+                stmt.execute(metricTableHeader + metricTableSuffix);
+                log.info("[Doris] Table {} ensured", TABLE_NAME);
+                stmt.execute(logTableHeader + logTableSuffix);
+                log.info("[Doris] Table {} ensured", LOG_TABLE_NAME);
+            }
+
+            log.info("[Doris] Initialized: {}", urlWithDb);
+            return true;
+        } catch (ClassNotFoundException e) {
+            log.error("[Doris] MySQL JDBC driver not found.", e);
+        } catch (Exception e) {
+            log.error("[Doris] Failed to initialize: {}", e.getMessage(), e);
+        }
+        return false;
+    }
+
+    @NotNull
+    private HikariConfig getHikariConfig(String urlWithDb, 
DorisProperties.PoolConfig poolConfig) {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl(urlWithDb);
+        config.setUsername(properties.username());
+        config.setPassword(properties.password());
+        config.setDriverClassName(DRIVER_NAME);
+        config.setMinimumIdle(poolConfig.minimumIdle());
+        config.setMaximumPoolSize(poolConfig.maximumPoolSize());
+        config.setConnectionTimeout(poolConfig.connectionTimeout());
+        config.setMaxLifetime(poolConfig.maxLifetime());
+        config.setIdleTimeout(poolConfig.idleTimeout());
+        config.setConnectionTestQuery("SELECT 1");
+        config.setPoolName("Doris-HikariCP");
+        return config;
+    }
+
+    /**
+     * Initialize Stream Load writer
+     */
+    private void initStreamLoadWriter() {
+        try {
+            this.streamLoadWriter = new DorisStreamLoadWriter(
+                    DATABASE_NAME, TABLE_NAME, properties.url(),
+                    properties.username(), properties.password(),
+                    writeConfig.streamLoadConfig()
+            );
+            if (streamLoadWriter.isAvailable()) {
+                log.info("[Doris] Stream Load writer initialized.");
+            } else {
+                log.warn("[Doris] Stream Load writer failed, fallback to 
JDBC.");
+                this.writeMode = WRITE_MODE_JDBC;
+            }
+
+            this.logStreamLoadWriter = new DorisStreamLoadWriter(
+                    DATABASE_NAME, LOG_TABLE_NAME, properties.url(),
+                    properties.username(), properties.password(),
+                    writeConfig.streamLoadConfig()
+            );
+            if (logStreamLoadWriter.isAvailable()) {
+                log.info("[Doris] Log Stream Load writer initialized.");
+            } else {
+                log.warn("[Doris] Log Stream Load writer unavailable, log 
writes fallback to JDBC.");
+                this.logStreamLoadWriter = null;
+            }
+        } catch (Exception e) {
+            log.error("[Doris] Stream Load init failed: {}", e.getMessage(), 
e);
+            this.writeMode = WRITE_MODE_JDBC;
+        }
+    }
+
+    /**
+     * Start the background flush task.
+     */
+    private void startFlushThread() {
+        try {
+            warehouseWorkerPool.executeJob(() -> {
+                flushTaskStarted = true;
+                try {
+                    while (flushThreadRunning || 
!metricsBufferQueue.isEmpty()) {
+                        try {
+                            List<DorisMetricRow> batch = new 
ArrayList<>(writeConfig.batchSize());
+
+                            // Wait for data or timeout
+                            DorisMetricRow first = 
metricsBufferQueue.poll(writeConfig.flushInterval(), TimeUnit.SECONDS);
+                            if (first != null) {
+                                batch.add(first);
+                                // Drain remaining items up to batch size
+                                metricsBufferQueue.drainTo(batch, 
writeConfig.batchSize() - 1);
+                            }
+
+                            if (!batch.isEmpty()) {
+                                doSaveData(batch);
+                                log.debug("[Doris] Flushed {} metrics items", 
batch.size());
+                            }
+
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            log.debug("[Doris] Flush task interrupted");
+                            break;
+                        } catch (Exception e) {
+                            log.error("[Doris] Flush task error: {}", 
e.getMessage(), e);
+                        }
+                    }
+                    log.info("[Doris] Flush task stopped");
+                } finally {
+                    flushTaskStopped.countDown();
+                }
+            });
+            log.info("[Doris] Started metrics flush task with interval {} 
seconds", writeConfig.flushInterval());
+        } catch (RejectedExecutionException e) {
+            log.error("[Doris] Failed to start flush task from 
WarehouseWorkerPool", e);
+        }
+    }
+
+    private String normalizeApp(String app) {
+        if (app != null && 
app.startsWith(CommonConstants.PROMETHEUS_APP_PREFIX)) {
+            return CommonConstants.PROMETHEUS;
+        }
+        return app;
+    }
+
+    @Override
+    public void saveData(CollectRep.MetricsData metricsData) {
+        if (!isServerAvailable() || metricsData.getCode() != 
CollectRep.Code.SUCCESS) {
+            return;
+        }
+        if (metricsData.getValues().isEmpty()) {
+            log.info("[Doris] Flush metrics data {} {} is null, ignore.", 
metricsData.getId(),
+                    metricsData.getMetrics());
+            return;
+        }
+
+        String instance = metricsData.getInstance();
+        String app = normalizeApp(metricsData.getApp());
+        String metrics = metricsData.getMetrics();
+        long timestamp = metricsData.getTime();
+
+        Map<String, String> customLabels = metricsData.getLabels();
+
+        List<DorisMetricRow> rows = new ArrayList<>();
+
+        try {
+            RowWrapper rowWrapper = metricsData.readRow();
+            while (rowWrapper.hasNextRow()) {
+                rowWrapper = rowWrapper.nextRow();
+
+                Map<String, String> rowLabels = new HashMap<>();
+
+                // Add custom labels
+                if (customLabels != null && !customLabels.isEmpty()) {
+                    customLabels.forEach((k, v) -> rowLabels.put(k, 
String.valueOf(v)));
+                }
+
+                var cells = rowWrapper.cellStream().toList();
+
+                // Collect all labels in this row first, then build metrics 
rows.
+                for (var cell : cells) {
+                    String value = cell.getValue();
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        continue;
+                    }
+                    boolean isLabel = 
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
+                    if (!isLabel) {
+                        continue;
+                    }
+                    String fieldName = cell.getField().getName();
+                    rowLabels.put(fieldName, value);
+                }
+
+                for (var cell : cells) {
+                    String value = cell.getValue();
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        continue;
+                    }
+
+                    boolean isLabel = 
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
+                    if (isLabel) {
+                        continue;
+                    }
+                    byte type = 
cell.getMetadataAsByte(MetricDataConstants.TYPE);
+                    String fieldName = cell.getField().getName();
+
+                    // Create a metric row for each non-label field.
+                    DorisMetricRow row = new DorisMetricRow();
+                    row.instance = instance;
+                    row.app = app;
+                    row.metrics = metrics;
+                    row.metric = fieldName;
+                    row.recordTime = new Timestamp(timestamp);
+                    row.labels = JsonUtil.toJson(rowLabels);
+
+                    if (type == CommonConstants.TYPE_NUMBER) {
+                        row.metricType = METRIC_TYPE_NUMBER;
+                        try {
+                            row.doubleValue = Double.parseDouble(value);
+                        } catch (NumberFormatException e) {
+                            log.debug("[Doris] Failed to parse number value: 
{}", value);
+                            continue;
+                        }
+                    } else if (type == CommonConstants.TYPE_STRING) {
+                        row.metricType = METRIC_TYPE_STRING;
+                        row.strValue = value;
+                    } else if (type == CommonConstants.TYPE_TIME) {
+                        row.metricType = METRIC_TYPE_TIME;
+                        try {
+                            row.int32Value = Integer.parseInt(value);
+                        } catch (NumberFormatException e) {
+                            log.debug("[Doris] Failed to parse time value: 
{}", value);
+                            continue;
+                        }
+                    } else {
+                        row.metricType = METRIC_TYPE_STRING;
+                        row.strValue = value;
+                    }
+
+                    rows.add(row);
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("[Doris] Error processing metrics data: {}", 
e.getMessage(), e);
+            return;
+        }
+
+        if (rows.isEmpty()) {
+            log.debug("[Doris] No valid metrics data to save for {} {}", app, 
metrics);
+            return;
+        }
+
+        sendToBuffer(rows);
+    }
+
+    /**
+     * Send metrics to buffer queue
+     */
+    private void sendToBuffer(List<DorisMetricRow> rows) {
+        for (int idx = 0; idx < rows.size(); idx++) {
+            DorisMetricRow row = rows.get(idx);
+            boolean offered = false;
+            int retryCount = 0;
+            while (!offered && retryCount < MAX_RETRIES) {
+                try {
+                    offered = metricsBufferQueue.offer(row, MAX_WAIT_MS, 
TimeUnit.MILLISECONDS);
+                    if (!offered) {
+                        if (retryCount == 0 && draining.compareAndSet(false, 
true)) {
+                            log.debug("[Doris] Buffer queue is full, 
triggering immediate flush");
+                            triggerImmediateFlush();
+                        }
+                        retryCount++;
+                        if (retryCount < MAX_RETRIES) {
+                            Thread.sleep(100L * retryCount);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    log.error("[Doris] Interrupted while offering metrics to 
buffer queue", e);
+                    break;
+                }
+            }
+            if (!offered) {
+                log.warn("[Doris] Failed to add metrics to buffer after {} 
retries, saving directly", MAX_RETRIES);
+                // Save only rows that are not successfully queued yet to 
avoid duplicate writes.
+                List<DorisMetricRow> remainingRows = new 
ArrayList<>(rows.subList(idx, rows.size()));
+                doSaveData(remainingRows);
+                return;
+            }
+        }
+
+        // Trigger early flush if buffer is almost full
+        if (metricsBufferQueue.size() >= writeConfig.batchSize() * 8 && 
draining.compareAndSet(false, true)) {
+            triggerImmediateFlush();
+        }
+    }
+
+    /**
+     * Trigger immediate flush
+     */
+    private void triggerImmediateFlush() {
+        List<DorisMetricRow> batch = new ArrayList<>(writeConfig.batchSize());
+        metricsBufferQueue.drainTo(batch, writeConfig.batchSize());
+        if (batch.isEmpty()) {
+            draining.set(false);
+            return;
+        }
+        try {
+            warehouseWorkerPool.executeJob(() -> {
+                try {
+                    doSaveData(batch);
+                } finally {
+                    draining.set(false);
+                }
+            });
+        } catch (RejectedExecutionException e) {
+            try {
+                log.warn("[Doris] Immediate flush task rejected by 
WarehouseWorkerPool, fallback to sync flush");
+                doSaveData(batch);
+            } finally {
+                draining.set(false);
+            }
+        }
+    }
+
+    /**
+     * Actually save data to Doris
+     * Uses either JDBC batch insert or Stream Load based on writeMode 
configuration
+     */
+    private void doSaveData(List<DorisMetricRow> rows) {
+        if (rows == null || rows.isEmpty()) {
+            return;
+        }
+
+        if (WRITE_MODE_STREAM.equals(writeMode) && streamLoadWriter != null) {
+            boolean success = streamLoadWriter.write(rows);
+            if (!success) {
+                if (writeConfig.fallbackToJdbcOnFailure()) {
+                    log.warn("[Doris] Stream Load failed, 
fallbackToJdbcOnFailure=true, use JDBC for this batch");
+                    doSaveDataJdbc(rows);
+                } else {
+                    log.error("[Doris] Stream Load failed and JDBC fallback is 
disabled. rows={}", rows.size());
+                }
+            }
+        } else {
+            // Use JDBC batch insert (default)
+            doSaveDataJdbc(rows);
+        }
+    }
+
+    /**
+     * Save data using JDBC batch insert
+     */
+    private void doSaveDataJdbc(List<DorisMetricRow> rows) {
+        if (rows == null || rows.isEmpty()) {
+            return;
+        }
+
+        String insertSql = """
+                INSERT INTO %s.%s (instance, app, metrics, metric, 
metric_type, int32_value, double_value, str_value, record_time, labels)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """
+                .formatted(DATABASE_NAME, TABLE_NAME);
+
+        try (Connection conn = dataSource.getConnection()) {
+            conn.setAutoCommit(false);
+
+            try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {
+                for (DorisMetricRow row : rows) {
+                    pstmt.setString(1, row.instance);
+                    pstmt.setString(2, row.app);
+                    pstmt.setString(3, row.metrics);
+                    pstmt.setString(4, row.metric);
+                    pstmt.setByte(5, row.metricType);
+
+                    if (row.int32Value != null) {
+                        pstmt.setInt(6, row.int32Value);
+                    } else {
+                        pstmt.setNull(6, java.sql.Types.INTEGER);
+                    }
+
+                    if (row.doubleValue != null) {
+                        pstmt.setDouble(7, row.doubleValue);
+                    } else {
+                        pstmt.setNull(7, java.sql.Types.DOUBLE);
+                    }
+
+                    pstmt.setString(8, row.strValue);
+                    pstmt.setTimestamp(9, row.recordTime);
+                    pstmt.setString(10, row.labels);
+
+                    pstmt.addBatch();
+                }
+
+                pstmt.executeBatch();
+                conn.commit();
+                log.debug("[Doris] Successfully saved {} metrics rows", 
rows.size());
+
+            } catch (SQLException e) {
+                conn.rollback();
+                throw e;
+            }
+        } catch (SQLException e) {
+            log.error("[Doris] Failed to save metrics data: {}", 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryMetricData(String instance, 
String app, String metrics, String metric,
+            String history) {
+        if (!isServerAvailable()) {
+            return Collections.emptyMap();
+        }
+
+        Map<String, Long> timeRange = getTimeRange(history);
+        app = normalizeApp(app);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        return queryMetricData(instance, app, metrics, metric, start, end);
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryIntervalMetricData(String 
instance, String app, String metrics,
+            String metric, String history) {
+        if (!isServerAvailable()) {
+            log.error("""
+
+                    \t---------------Doris Init Failed---------------
+                    \t--------------Please Config Doris--------------
+                    \t----------Can Not Use Metric History Now----------
+                    """);
+            return Collections.emptyMap();
+        }
+
+        Map<String, Long> timeRange = getTimeRange(history);
+        app = normalizeApp(app);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        // First get the basic data
+        Map<String, List<Value>> instanceValuesMap = queryMetricData(instance, 
app, metrics, metric, start, end);
+
+        if (instanceValuesMap.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        // Calculate aggregations for each label group
+        try {
+            queryAndSetAggregations(instanceValuesMap, instance, app, metrics, 
metric, start, end);
+        } catch (Exception e) {
+            log.error("[Doris] Error calculating aggregations: {}", 
e.getMessage(), e);
+        }
+
+        return instanceValuesMap;
+    }
+
+    /**
+     * Query metric data from Doris
+     */
+    private Map<String, List<Value>> queryMetricData(String instance, String 
app, String metrics, String metric,
+            long startTime, long endTime) {
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>();
+
+        String sql = """
+                SELECT record_time, metric_type, int32_value, double_value, 
str_value, labels
+                FROM %s.%s
+                WHERE instance = ? AND app = ? AND metrics = ? AND metric = ?
+                AND record_time >= ? AND record_time <= ?
+                ORDER BY record_time DESC
+                LIMIT %d
+                """.formatted(DATABASE_NAME, TABLE_NAME, MAX_QUERY_LIMIT);
+
+        try (Connection conn = dataSource.getConnection();
+                PreparedStatement pstmt = conn.prepareStatement(sql)) {
+
+            pstmt.setString(1, instance);
+            pstmt.setString(2, app);
+            pstmt.setString(3, metrics);
+            pstmt.setString(4, metric);
+            pstmt.setTimestamp(5, new Timestamp(startTime * 1000));
+            pstmt.setTimestamp(6, new Timestamp(endTime * 1000));
+
+            try (ResultSet rs = pstmt.executeQuery()) {
+                while (rs.next()) {
+                    Timestamp recordTime = rs.getTimestamp("record_time");
+                    byte metricType = rs.getByte("metric_type");
+                    String labels = rs.getString("labels");
+
+                    String valueStr;
+                    if (metricType == METRIC_TYPE_NUMBER) {
+                        double doubleValue = rs.getDouble("double_value");
+                        if (!rs.wasNull()) {
+                            valueStr = BigDecimal.valueOf(doubleValue)
+                                    .setScale(4, RoundingMode.HALF_UP)
+                                    .stripTrailingZeros()
+                                    .toPlainString();
+                        } else {
+                            continue;
+                        }
+                    } else if (metricType == METRIC_TYPE_TIME) {
+                        int intValue = rs.getInt("int32_value");
+                        if (!rs.wasNull()) {
+                            valueStr = String.valueOf(intValue);
+                        } else {
+                            continue;
+                        }
+                    } else {
+                        valueStr = rs.getString("str_value");
+                        if (valueStr == null) {
+                            continue;
+                        }
+                    }
+
+                    String labelKey = labels != null ? labels : "{}";
+                    List<Value> valueList = 
instanceValuesMap.computeIfAbsent(labelKey, k -> new LinkedList<>());
+                    valueList.add(new Value(valueStr, recordTime.getTime()));
+                }
+            }
+
+            log.debug("[Doris] Query returned {} label groups", 
instanceValuesMap.size());
+
+        } catch (SQLException e) {
+            log.error("[Doris] Failed to query metrics data: {}", 
e.getMessage(), e);
+        }
+
+        return instanceValuesMap;
+    }
+
+    /**
+     * Query and set aggregation values (max, min, avg)
+     */
+    private void queryAndSetAggregations(Map<String, List<Value>> 
instanceValuesMap,
+            String instance, String app, String metrics, String metric,
+            long startTime, long endTime) {
+        // Calculate step based on time range
+        long duration = endTime - startTime;
+        int stepSeconds;
+
+        if (duration < Duration.ofDays(1).getSeconds()) {
+            stepSeconds = 60;
+        } else if (duration < Duration.ofDays(7).getSeconds()) {
+            stepSeconds = 3600; // 1 hour
+        } else {
+            stepSeconds = 14400; // 4 hours
+        }
+
+        String aggregationSql = """
+                SELECT
+                    labels,
+                    FLOOR(UNIX_TIMESTAMP(record_time) / %d) * %d AS 
time_bucket,
+                    MAX(double_value) as max_val,
+                    MIN(double_value) as min_val,
+                    AVG(double_value) as avg_val
+                FROM %s.%s
+                WHERE instance = ? AND app = ? AND metrics = ? AND metric = ?
+                AND record_time >= ? AND record_time <= ?
+                AND metric_type = %d
+                GROUP BY labels, time_bucket
+                ORDER BY time_bucket
+                """.formatted(stepSeconds, stepSeconds, DATABASE_NAME, 
TABLE_NAME, METRIC_TYPE_NUMBER);
+
+        try (Connection conn = dataSource.getConnection();
+                PreparedStatement pstmt = 
conn.prepareStatement(aggregationSql)) {
+
+            pstmt.setString(1, instance);
+            pstmt.setString(2, app);
+            pstmt.setString(3, metrics);
+            pstmt.setString(4, metric);
+            pstmt.setTimestamp(5, new Timestamp(startTime * 1000));
+            pstmt.setTimestamp(6, new Timestamp(endTime * 1000));
+
+            Map<String, Map<Long, double[]>> labelBucketAggregations = new 
HashMap<>();
+
+            try (ResultSet rs = pstmt.executeQuery()) {
+                while (rs.next()) {
+                    String labels = rs.getString("labels");
+                    long timeBucket = rs.getLong("time_bucket");
+                    double maxVal = rs.getDouble("max_val");
+                    double minVal = rs.getDouble("min_val");
+                    double avgVal = rs.getDouble("avg_val");
+
+                    String labelKey = labels != null ? labels : "{}";
+                    Map<Long, double[]> bucketMap = 
labelBucketAggregations.computeIfAbsent(labelKey,
+                            k -> new HashMap<>());
+                    bucketMap.put(timeBucket * 1000, new double[] { maxVal, 
minVal, avgVal });
+                }
+            }
+
+            // Apply aggregations to values
+            for (Map.Entry<String, List<Value>> entry : 
instanceValuesMap.entrySet()) {
+                String labelKey = entry.getKey();
+                List<Value> values = entry.getValue();
+                Map<Long, double[]> bucketMap = 
labelBucketAggregations.get(labelKey);
+
+                if (bucketMap != null) {
+                    for (Value value : values) {
+                        // Find the matching bucket
+                        long valueBucket = (value.getTime() / (stepSeconds * 
1000L)) * (stepSeconds * 1000L);
+                        double[] aggregations = bucketMap.get(valueBucket);
+                        if (aggregations != null) {
+                            value.setMax(formatDouble(aggregations[0]));
+                            value.setMin(formatDouble(aggregations[1]));
+                            value.setMean(formatDouble(aggregations[2]));
+                        }
+                    }
+                }
+            }
+
+        } catch (SQLException e) {
+            log.error("[Doris] Failed to query aggregations: {}", 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Format double value to string
+     */
+    private String formatDouble(double value) {
+        return BigDecimal.valueOf(value)
+                .setScale(4, RoundingMode.HALF_UP)
+                .stripTrailingZeros()
+                .toPlainString();
+    }
+
+    /**
+     * Get time range based on history parameter
+     */
+    private Map<String, Long> getTimeRange(String history) {
+        Instant now = Instant.now();
+        long start;
+        try {
+            if (NumberUtils.isParsable(history)) {
+                start = NumberUtils.toLong(history);
+                start = ZonedDateTime.now().toEpochSecond() - start;
+            } else {
+                TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                assert temporalAmount != null;
+                Instant dateTime = now.minus(temporalAmount);
+                start = dateTime.getEpochSecond();
+            }
+        } catch (Exception e) {
+            log.error("[Doris] History time error: {}. Using default: 6h", 
e.getMessage());
+            start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+        }
+        long end = now.getEpochSecond();
+        return Map.of(LABEL_KEY_START_TIME, start, LABEL_KEY_END_TIME, end);
+    }
+
+    @Override
+    public void saveLogData(LogEntry logEntry) {
+        if (logEntry == null) {
+            return;
+        }
+        saveLogDataBatch(List.of(logEntry));
+    }
+
+    @Override
+    public void saveLogDataBatch(List<LogEntry> logEntries) {
+        if (!isServerAvailable() || logEntries == null || 
logEntries.isEmpty()) {
+            return;
+        }
+
+        int total = logEntries.size();
+        for (int i = 0; i < total; i += LOG_BATCH_SIZE) {
+            int end = Math.min(i + LOG_BATCH_SIZE, total);
+            List<LogEntry> batch = logEntries.subList(i, end);
+            if (WRITE_MODE_STREAM.equals(writeMode) && logStreamLoadWriter != 
null) {
+                boolean success = logStreamLoadWriter.writeLogs(batch);
+                if (!success) {
+                    if (writeConfig.fallbackToJdbcOnFailure()) {
+                        log.warn("[Doris] Log Stream Load failed, 
fallbackToJdbcOnFailure=true, use JDBC for this batch");
+                        doSaveLogDataJdbc(batch);
+                    } else {
+                        log.error("[Doris] Log Stream Load failed and JDBC 
fallback is disabled. rows={}", batch.size());
+                    }
+                }
+            } else {
+                doSaveLogDataJdbc(batch);
+            }
+        }
+    }
+
+    private void doSaveLogDataJdbc(List<LogEntry> logEntries) {
+        if (logEntries == null || logEntries.isEmpty()) {
+            return;
+        }
+
+        String insertSql = """
+                INSERT INTO %s.%s (
+                time_unix_nano, trace_id, span_id, event_time, 
observed_time_unix_nano,
+                severity_number, severity_text, body, trace_flags, attributes, 
resource,
+                instrumentation_scope, dropped_attributes_count
+                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """.formatted(DATABASE_NAME, LOG_TABLE_NAME);
+
+        try (Connection conn = dataSource.getConnection()) {
+            conn.setAutoCommit(false);
+            try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {
+                for (LogEntry logEntry : logEntries) {
+                    long timeUnixNano = 
normalizeTimeUnixNano(logEntry.getTimeUnixNano());
+                    long observedTimeUnixNano = 
logEntry.getObservedTimeUnixNano() != null
+                        ? logEntry.getObservedTimeUnixNano()
+                        : timeUnixNano;
+
+                    pstmt.setLong(1, timeUnixNano);
+                    pstmt.setString(2, logEntry.getTraceId());
+                    pstmt.setString(3, logEntry.getSpanId());
+                    pstmt.setTimestamp(4, nanosToTimestamp(timeUnixNano));
+                    pstmt.setLong(5, observedTimeUnixNano);
+
+                    if (logEntry.getSeverityNumber() != null) {
+                        pstmt.setInt(6, logEntry.getSeverityNumber());
+                    } else {
+                        pstmt.setNull(6, java.sql.Types.INTEGER);
+                    }
+                    pstmt.setString(7, logEntry.getSeverityText());
+                    pstmt.setString(8, JsonUtil.toJson(logEntry.getBody()));
+                    if (logEntry.getTraceFlags() != null) {
+                        pstmt.setInt(9, logEntry.getTraceFlags());
+                    } else {
+                        pstmt.setNull(9, java.sql.Types.INTEGER);
+                    }
+                    pstmt.setString(10, 
JsonUtil.toJson(logEntry.getAttributes()));
+                    pstmt.setString(11, 
JsonUtil.toJson(logEntry.getResource()));
+                    pstmt.setString(12, 
JsonUtil.toJson(logEntry.getInstrumentationScope()));
+                    if (logEntry.getDroppedAttributesCount() != null) {
+                        pstmt.setInt(13, logEntry.getDroppedAttributesCount());
+                    } else {
+                        pstmt.setNull(13, java.sql.Types.INTEGER);
+                    }
+
+                    pstmt.addBatch();
+                }
+                pstmt.executeBatch();
+                conn.commit();
+            } catch (SQLException e) {
+                conn.rollback();
+                throw e;
+            }
+        } catch (SQLException e) {
+            log.error("[Doris] Failed to save log data: {}", e.getMessage(), 
e);
+        }
+    }
+
+    @Override
+    public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long 
endTime, String traceId,
+                                                        String spanId, Integer 
severityNumber,
+                                                        String severityText, 
String searchContent) {
+        StringBuilder sql = new StringBuilder("""
+                SELECT time_unix_nano, observed_time_unix_nano, 
severity_number, severity_text, body,
+                       trace_id, span_id, trace_flags, attributes, resource, 
instrumentation_scope, dropped_attributes_count
+                FROM %s.%s
+                """.formatted(DATABASE_NAME, LOG_TABLE_NAME));
+        List<Object> params = new ArrayList<>();
+        appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId, 
severityNumber, severityText, searchContent);
+        sql.append(" ORDER BY time_unix_nano DESC");
+        return executeLogQuery(sql.toString(), params, 
"queryLogsByMultipleConditions");
+    }
+
+    @Override
+    public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long 
startTime, Long endTime, String traceId,
+                                                                      String 
spanId, Integer severityNumber,
+                                                                      String 
severityText, String searchContent,
+                                                                      Integer 
offset, Integer limit) {
+        StringBuilder sql = new StringBuilder("""
+                SELECT time_unix_nano, observed_time_unix_nano, 
severity_number, severity_text, body,
+                       trace_id, span_id, trace_flags, attributes, resource, 
instrumentation_scope, dropped_attributes_count
+                FROM %s.%s
+                """.formatted(DATABASE_NAME, LOG_TABLE_NAME));
+        List<Object> params = new ArrayList<>();
+        appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId, 
severityNumber, severityText, searchContent);
+        sql.append(" ORDER BY time_unix_nano DESC");
+        if (limit != null && limit > 0) {
+            sql.append(" LIMIT ?");
+            params.add(limit);
+            if (offset != null && offset > 0) {
+                sql.append(" OFFSET ?");
+                params.add(offset);
+            }
+        }
+        return executeLogQuery(sql.toString(), params, 
"queryLogsByMultipleConditionsWithPagination");
+    }
+
+    @Override
+    public long countLogsByMultipleConditions(Long startTime, Long endTime, 
String traceId,
+                                              String spanId, Integer 
severityNumber,
+                                              String severityText, String 
searchContent) {
+        StringBuilder sql = new StringBuilder("SELECT COUNT(*) AS count FROM 
%s.%s"
+            .formatted(DATABASE_NAME, LOG_TABLE_NAME));
+        List<Object> params = new ArrayList<>();
+        appendLogWhereClause(sql, params, startTime, endTime, traceId, spanId, 
severityNumber, severityText, searchContent);
+
+        try (Connection conn = dataSource.getConnection();
+             PreparedStatement pstmt = conn.prepareStatement(sql.toString())) {
+            bindParameters(pstmt, params);
+            try (ResultSet rs = pstmt.executeQuery()) {
+                if (rs.next()) {
+                    return rs.getLong("count");
+                }
+                return 0;
+            }
+        } catch (Exception e) {
+            log.error("[Doris] countLogsByMultipleConditions error: {}", 
e.getMessage(), e);
+            return 0;
+        }
+    }
+
+    @Override
+    public boolean batchDeleteLogs(List<Long> timeUnixNanos) {
+        if (!isServerAvailable() || timeUnixNanos == null || 
timeUnixNanos.isEmpty()) {
+            return false;
+        }
+        List<Long> validTimeUnixNanos = timeUnixNanos.stream()
+            .filter(Objects::nonNull)
+            .toList();
+        if (validTimeUnixNanos.isEmpty()) {
+            return false;
+        }
+
+        String placeholders = String.join(",", 
Collections.nCopies(validTimeUnixNanos.size(), "?"));
+        String sql = "DELETE FROM %s.%s WHERE time_unix_nano IN (%s)"
+            .formatted(DATABASE_NAME, LOG_TABLE_NAME, placeholders);
+        try (Connection conn = dataSource.getConnection();
+             PreparedStatement pstmt = conn.prepareStatement(sql)) {
+            int index = 1;
+            for (Long timeUnixNano : validTimeUnixNanos) {
+                pstmt.setLong(index++, timeUnixNano);
+            }
+            pstmt.executeUpdate();
+            return true;
+        } catch (Exception e) {
+            log.error("[Doris] batchDeleteLogs error: {}", e.getMessage(), e);
+            return false;
+        }
+    }
+
+    private void appendLogWhereClause(StringBuilder sql, List<Object> params,
+                                      Long startTime, Long endTime,
+                                      String traceId, String spanId,
+                                      Integer severityNumber, String 
severityText,
+                                      String searchContent) {
+        List<String> conditions = new ArrayList<>();
+        if (startTime != null) {
+            conditions.add("time_unix_nano >= ?");
+            params.add(msToNs(startTime));
+        }
+        if (endTime != null) {
+            conditions.add("time_unix_nano <= ?");
+            params.add(msToNs(endTime));
+        }
+        if (StringUtils.hasText(traceId)) {
+            conditions.add("trace_id = ?");
+            params.add(traceId);
+        }
+        if (StringUtils.hasText(spanId)) {
+            conditions.add("span_id = ?");
+            params.add(spanId);
+        }
+        if (severityNumber != null) {
+            conditions.add("severity_number = ?");
+            params.add(severityNumber);
+        }
+        if (StringUtils.hasText(severityText)) {
+            conditions.add("severity_text = ?");
+            params.add(severityText);
+        }
+        if (StringUtils.hasText(searchContent)) {
+            conditions.add("body LIKE ?");
+            params.add("%" + searchContent + "%");
+        }
+        if (!conditions.isEmpty()) {
+            sql.append(" WHERE ").append(String.join(" AND ", conditions));
+        }
+    }
+
+    private void bindParameters(PreparedStatement pstmt, List<Object> params) 
throws SQLException {
+        for (int i = 0; i < params.size(); i++) {
+            pstmt.setObject(i + 1, params.get(i));
+        }
+    }
+
+    private List<LogEntry> executeLogQuery(String sql, List<Object> params, 
String queryName) {
+        try (Connection conn = dataSource.getConnection();
+             PreparedStatement pstmt = conn.prepareStatement(sql)) {
+            bindParameters(pstmt, params);
+            try (ResultSet rs = pstmt.executeQuery()) {
+                return mapRowsToLogEntries(rs);
+            }
+        } catch (Exception e) {
+            log.error("[Doris] {} error: {}", queryName, e.getMessage(), e);
+            return List.of();
+        }
+    }
+
+    private List<LogEntry> mapRowsToLogEntries(ResultSet rs) throws 
SQLException {
+        List<LogEntry> entries = new LinkedList<>();
+        while (rs.next()) {
+            String instrumentationScopeStr = 
rs.getString("instrumentation_scope");
+            LogEntry.InstrumentationScope instrumentationScope = null;
+            if (StringUtils.hasText(instrumentationScopeStr)) {
+                instrumentationScope = 
JsonUtil.fromJson(instrumentationScopeStr, LogEntry.InstrumentationScope.class);
+            }
+
+            LogEntry entry = LogEntry.builder()
+                .timeUnixNano(rs.getLong("time_unix_nano"))
+                .observedTimeUnixNano(rs.getLong("observed_time_unix_nano"))
+                .severityNumber(getNullableInteger(rs, "severity_number"))
+                .severityText(rs.getString("severity_text"))
+                .body(parseJsonMaybe(rs.getString("body")))
+                .traceId(rs.getString("trace_id"))
+                .spanId(rs.getString("span_id"))
+                .traceFlags(getNullableInteger(rs, "trace_flags"))
+                
.attributes(castToMap(parseJsonMaybe(rs.getString("attributes"))))
+                .resource(castToMap(parseJsonMaybe(rs.getString("resource"))))
+                .instrumentationScope(instrumentationScope)
+                .droppedAttributesCount(getNullableInteger(rs, 
"dropped_attributes_count"))
+                .build();
+            entries.add(entry);
+        }
+        return entries;
+    }
+
+    private Integer getNullableInteger(ResultSet rs, String columnName) throws 
SQLException {
+        int value = rs.getInt(columnName);
+        return rs.wasNull() ? null : value;
+    }
+
+    private static long msToNs(Long ms) {
+        return ms * NANOS_PER_MILLISECOND;
+    }
+
+    private long normalizeTimeUnixNano(Long timeUnixNano) {
+        return timeUnixNano != null ? timeUnixNano : 
System.currentTimeMillis() * NANOS_PER_MILLISECOND;
+    }
+
+    private Timestamp nanosToTimestamp(long timeUnixNano) {
+        return new Timestamp(timeUnixNano / NANOS_PER_MILLISECOND);
+    }
+
+    private Object parseJsonMaybe(String value) {
+        if (!StringUtils.hasText(value)) {
+            return null;
+        }
+        String trimmed = value.trim();
+        boolean maybeJson = (trimmed.startsWith("{") && trimmed.endsWith("}"))
+            || (trimmed.startsWith("[") && trimmed.endsWith("]"))
+            || (trimmed.startsWith("\"") && trimmed.endsWith("\""));
+        if (!maybeJson) {
+            return trimmed;
+        }
+        Object parsed = JsonUtil.fromJson(trimmed, Object.class);
+        return parsed != null ? parsed : trimmed;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> castToMap(Object value) {
+        if (value instanceof Map<?, ?> map) {
+            return (Map<String, Object>) map;
+        }
+        return null;
+    }
+
+    @Override
+    public void destroy() {
+        log.info("[Doris] Shutting down...");
+
+        flushThreadRunning = false;
+
+        // Wait flush task to finish.
+        if (flushTaskStarted) {
+            try {
+                boolean stopped = flushTaskStopped.await(10000, 
TimeUnit.MILLISECONDS);
+                if (!stopped) {
+                    log.warn("[Doris] Timed out waiting for flush task to 
stop");
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        // Flush remaining data
+        List<DorisMetricRow> remaining = new ArrayList<>();
+        metricsBufferQueue.drainTo(remaining);
+        if (!remaining.isEmpty()) {
+            log.info("[Doris] Flushing {} remaining metrics before shutdown", 
remaining.size());
+            doSaveData(remaining);
+        }
+
+        // Close data source
+        if (dataSource != null && !dataSource.isClosed()) {
+            dataSource.close();
+            log.info("[Doris] Connection pool closed.");
+        }
+
+        // Close Stream Load writer
+        if (streamLoadWriter != null) {
+            streamLoadWriter.close();
+            log.info("[Doris] Stream Load writer closed.");
+        }
+        if (logStreamLoadWriter != null) {
+            logStreamLoadWriter.close();
+            log.info("[Doris] Log Stream Load writer closed.");
+        }
+    }
+
+}
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
new file mode 100644
index 0000000000..f921ce95ef
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisMetricRow.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.sql.Timestamp;
+
+/**
+ * Internal class to represent a metric row for Doris storage.
+ * This class is used by both DorisDataStorage and DorisStreamLoadWriter.
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class DorisMetricRow {
+    public String instance;
+    public String app;
+    public String metrics;
+    public String metric;
+    public byte metricType;
+    public Integer int32Value;
+    public Double doubleValue;
+    public String strValue;
+    public Timestamp recordTime;
+    public String labels;
+}
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
new file mode 100644
index 0000000000..f8f43ae0f7
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisProperties.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import org.apache.hertzbeat.common.constants.ConfigConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.warehouse.constants.WarehouseConstants;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.bind.DefaultValue;
+
+/**
+ * Apache Doris configuration properties
+ */
+@ConfigurationProperties(prefix = 
ConfigConstants.FunctionModuleConstants.WAREHOUSE
+        + SignConstants.DOT
+        + WarehouseConstants.STORE
+        + SignConstants.DOT
+        + WarehouseConstants.HistoryName.DORIS)
+public record DorisProperties(
+        @DefaultValue("false") boolean enabled,
+        @DefaultValue("jdbc:mysql://127.0.0.1:9030/hertzbeat") String url,
+        String username,
+        String password,
+        TableConfig tableConfig,
+        PoolConfig poolConfig,
+        WriteConfig writeConfig) {
+    /**
+     * Table structure configuration
+     */
+    public record TableConfig(
+            // Whether to enable dynamic partitioning (default enabled)
+            @DefaultValue("false") boolean enablePartition,
+            // Partition time unit: DAY, HOUR, MONTH
+            @DefaultValue("DAY") String partitionTimeUnit,
+            // Dynamic partition retention days
+            @DefaultValue("7") int partitionRetentionDays,
+            // Dynamic partition creation range (future partitions to create)
+            @DefaultValue("3") int partitionFutureDays,
+            // Number of buckets
+            @DefaultValue("8") int buckets,
+            // Number of replicas (recommended 3 for production)
+            @DefaultValue("1") int replicationNum,
+            // Maximum length of string columns
+            @DefaultValue("4096") int strColumnMaxLength) {
+        public TableConfig {
+            if (partitionRetentionDays <= 0) {
+                partitionRetentionDays = 7;
+            }
+            if (partitionFutureDays <= 0) {
+                partitionFutureDays = 3;
+            }
+            if (buckets <= 0) {
+                buckets = 8;
+            }
+            if (replicationNum <= 0) {
+                replicationNum = 1;
+            }
+            if (strColumnMaxLength <= 0 || strColumnMaxLength > 65533) {
+                strColumnMaxLength = 4096;
+            }
+        }
+    }
+
+    /**
+     * Connection pool configuration (based on HikariCP)
+     */
+    public record PoolConfig(
+            // Minimum idle connections
+            @DefaultValue("5") int minimumIdle,
+            // Maximum pool size
+            @DefaultValue("20") int maximumPoolSize,
+            // Connection timeout in milliseconds
+            @DefaultValue("30000") int connectionTimeout,
+            // Maximum connection lifetime in milliseconds (0 means no limit)
+            @DefaultValue("0") long maxLifetime,
+            // Idle connection timeout in milliseconds (0 means never recycle)
+            @DefaultValue("600000") long idleTimeout) {
+        public PoolConfig {
+            if (minimumIdle <= 0) {
+                minimumIdle = 5;
+            }
+            if (maximumPoolSize <= 0) {
+                maximumPoolSize = 20;
+            }
+            if (minimumIdle > maximumPoolSize) {
+                minimumIdle = maximumPoolSize;
+            }
+            if (connectionTimeout <= 0) {
+                connectionTimeout = 30000;
+            }
+        }
+    }
+
+    /**
+     * Write configuration
+     */
+    public record WriteConfig(
+            // Write mode: jdbc (batch insert) or stream (HTTP stream load)
+            @DefaultValue("jdbc") String writeMode,
+            // Batch write size (for jdbc mode)
+            @DefaultValue("1000") int batchSize,
+            // Batch write flush interval in seconds (for jdbc mode)
+            @DefaultValue("5") int flushInterval,
+            // Fallback to JDBC when stream load fails (may introduce 
duplicate data in ambiguous cases)
+            @DefaultValue("false") boolean fallbackToJdbcOnFailure,
+            // Stream load configuration (for stream mode)
+            StreamLoadConfig streamLoadConfig) {
+        public WriteConfig {
+            String normalizedWriteMode = writeMode == null ? "" : 
writeMode.trim().toLowerCase();
+            if (!"jdbc".equals(normalizedWriteMode) && 
!"stream".equals(normalizedWriteMode)) {
+                writeMode = "jdbc";
+            } else {
+                writeMode = normalizedWriteMode;
+            }
+            if (batchSize <= 0) {
+                batchSize = 1000;
+            }
+            if (flushInterval <= 0) {
+                flushInterval = 5;
+            }
+            if (streamLoadConfig == null) {
+                streamLoadConfig = StreamLoadConfig.createDefault();
+            }
+        }
+    }
+
+    /**
+     * Stream Load configuration for HTTP-based streaming writes
+     */
+    public record StreamLoadConfig(
+            // Doris FE HTTP port for Stream Load API
+            @DefaultValue(":8030") String httpPort,
+            // Stream load timeout in seconds
+            @DefaultValue("60") int timeout,
+            // Max batch size in bytes for stream load
+            @DefaultValue("10485760") int maxBytesPerBatch,
+            // Maximum allowed filter ratio in [0,1]
+            @DefaultValue("0.1") double maxFilterRatio,
+            // Enable strict mode
+            @DefaultValue("false") boolean strictMode,
+            // Import timezone, empty means Doris default
+            @DefaultValue("") String timezone,
+            // Redirect policy: direct/public/private
+            @DefaultValue("") String redirectPolicy,
+            // Group commit mode: async_mode/sync_mode/off_mode
+            @DefaultValue("") String groupCommit,
+            // Send batch parallelism, 0 means Doris default
+            @DefaultValue("0") int sendBatchParallelism,
+            // Retry times for one label when stream load is retryable
+            @DefaultValue("2") int retryTimes) {
+        public StreamLoadConfig {
+            if (httpPort == null || httpPort.isBlank()) {
+                httpPort = ":8030";
+            } else {
+                httpPort = httpPort.trim();
+            }
+            if (timeout <= 0) {
+                timeout = 60;
+            }
+            if (maxBytesPerBatch <= 0) {
+                maxBytesPerBatch = 10485760; // 10MB
+            }
+            if (maxFilterRatio < 0 || maxFilterRatio > 1) {
+                maxFilterRatio = 0.1;
+            }
+            if (sendBatchParallelism < 0) {
+                sendBatchParallelism = 0;
+            }
+            if (retryTimes < 0) {
+                retryTimes = 2;
+            }
+            if (!isValidRedirectPolicy(redirectPolicy)) {
+                redirectPolicy = "";
+            }
+            if (!isValidGroupCommit(groupCommit)) {
+                groupCommit = "";
+            }
+            timezone = timezone == null ? "" : timezone.trim();
+        }
+
+        /**
+         * Factory method to create default StreamLoadConfig
+         */
+        public static StreamLoadConfig createDefault() {
+            return new StreamLoadConfig(":8030", 60, 10485760,
+                    0.1, false, "", "", "", 0, 2);
+        }
+
+        private static boolean isValidRedirectPolicy(String value) {
+            if (value == null || value.isBlank()) {
+                return true;
+            }
+            return "direct".equalsIgnoreCase(value)
+                    || "public".equalsIgnoreCase(value)
+                    || "private".equalsIgnoreCase(value);
+        }
+
+        private static boolean isValidGroupCommit(String value) {
+            if (value == null || value.isBlank()) {
+                return true;
+            }
+            return "async_mode".equalsIgnoreCase(value)
+                    || "sync_mode".equalsIgnoreCase(value)
+                    || "off_mode".equalsIgnoreCase(value);
+        }
+    }
+
+    // Provide default values for nested configs if null
+    public DorisProperties {
+        if (tableConfig == null) {
+            tableConfig = new TableConfig(false, "DAY", 7, 3, 8, 1, 4096);
+        }
+        if (poolConfig == null) {
+            poolConfig = new PoolConfig(5, 20, 30000, 0, 600000);
+        }
+        if (writeConfig == null) {
+            writeConfig = new WriteConfig("jdbc", 1000, 5, false, 
StreamLoadConfig.createDefault());
+        }
+    }
+}
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
new file mode 100644
index 0000000000..d45213a91e
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisStreamLoadWriter.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.http.Header;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpHeaders;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.ProtocolException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Doris Stream Load writer using Apache HttpClient for high-throughput writes.
+ * Based on official Apache Doris Stream Load Java example.
+ * <p>
+ * Reference: <a 
href="https://github.com/apache/doris/blob/master/samples/doris-stream-load-demo/src/main/java/DorisStreamLoad.java";>...</a>
+ */
+@Slf4j
+public class DorisStreamLoadWriter {
+
+    private static final int CONNECT_TIMEOUT = 30000;
+    private static final int SOCKET_TIMEOUT = 60000;
+    private static final int CONNECTION_REQUEST_TIMEOUT = 5000;
+    private static final int MAX_TOTAL_CONNECTIONS = 20;
+    private static final int MAX_PER_ROUTE_CONNECTIONS = 10;
+    private static final long RETRY_BACKOFF_MS = 200L;
+    private static final DateTimeFormatter DORIS_DATETIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private static final String METRIC_JSON_PATHS =
+            
"[\"$.instance\",\"$.app\",\"$.metrics\",\"$.metric\",\"$.recordTime\",\"$.metricType\",\"$.int32Value\",\"$.doubleValue\",\"$.strValue\",\"$.labels\"]";
+    private static final String METRIC_COLUMNS =
+            
"instance,app,metrics,metric,record_time,metric_type,int32_value,double_value,str_value,labels";
+    private static final String LOG_JSON_PATHS =
+            
"[\"$.timeUnixNano\",\"$.observedTimeUnixNano\",\"$.eventTime\",\"$.severityNumber\",\"$.severityText\""
+                    + 
",\"$.body\",\"$.traceId\",\"$.spanId\",\"$.traceFlags\",\"$.attributes\",\"$.resource\""
+                    + 
",\"$.instrumentationScope\",\"$.droppedAttributesCount\"]";
+    private static final String LOG_COLUMNS =
+            
"time_unix_nano,observed_time_unix_nano,event_time,severity_number,severity_text,body,trace_id,span_id,trace_flags,attributes,resource,instrumentation_scope,dropped_attributes_count";
+
+    public static final String STATUS_SUCCESS = "Success";
+    public static final String STATUS_PUBLISH_TIMEOUT = "Publish Timeout";
+    public static final String STATUS_FAIL = "Fail";
+    public static final String STATUS_LABEL_ALREADY_EXISTS = "Label Already 
Exists";
+
+    private final String databaseName;
+    private final String tableName;
+    private final String feHost;
+    private final int feHttpPort;
+    private final String username;
+    private final String password;
+    private final DorisProperties.StreamLoadConfig config;
+
+    private final AtomicLong transactionId = new AtomicLong(0);
+    private final CloseableHttpClient httpClient;
+
+    private enum LoadResult {
+        SUCCESS,
+        RETRYABLE_FAILURE,
+        NON_RETRYABLE_FAILURE
+    }
+
+    private static final class StreamLoadMetricRow {
+        public String instance;
+        public String app;
+        public String metrics;
+        public String metric;
+        public String recordTime;
+        public Byte metricType;
+        public Integer int32Value;
+        public Double doubleValue;
+        public String strValue;
+        public String labels;
+    }
+
+    private static final class StreamLoadLogRow {
+        public Long timeUnixNano;
+        public Long observedTimeUnixNano;
+        public String eventTime;
+        public Integer severityNumber;
+        public String severityText;
+        public String body;
+        public String traceId;
+        public String spanId;
+        public Integer traceFlags;
+        public String attributes;
+        public String resource;
+        public String instrumentationScope;
+        public Integer droppedAttributesCount;
+    }
+
+    /**
+     * -- GETTER --
+     * Check if writer is available
+     */
+    @Getter
+    private volatile boolean available = true;
+
+    public DorisStreamLoadWriter(String databaseName, String tableName,
+                                  String jdbcUrl, String username, String 
password,
+                                  DorisProperties.StreamLoadConfig config) {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.username = username;
+        this.password = password;
+        this.config = config;
+
+        this.feHost = parseHostFromJdbcUrl(jdbcUrl);
+        this.feHttpPort = parseHttpPort(config.httpPort());
+
+        // Create HTTP client with connection pool and redirect support
+        this.httpClient = createHttpClient();
+
+        log.info("[Doris StreamLoad] Writer initialized for {}.{}", 
databaseName, tableName);
+    }
+
+    private String parseHostFromJdbcUrl(String jdbcUrl) {
+        if (jdbcUrl == null || jdbcUrl.isBlank()) {
+            return "127.0.0.1";
+        }
+
+        String hostPart = jdbcUrl;
+        if (hostPart.startsWith("jdbc:mysql://")) {
+            hostPart = hostPart.substring("jdbc:mysql://".length());
+        }
+
+        int slashIndex = hostPart.indexOf('/');
+        if (slashIndex > 0) {
+            hostPart = hostPart.substring(0, slashIndex);
+        }
+
+        int queryIndex = hostPart.indexOf('?');
+        if (queryIndex > 0) {
+            hostPart = hostPart.substring(0, queryIndex);
+        }
+
+        // Multi-host URL: use first host
+        int commaIndex = hostPart.indexOf(',');
+        if (commaIndex > 0) {
+            hostPart = hostPart.substring(0, commaIndex);
+        }
+
+        int colonIndex = hostPart.lastIndexOf(':');
+        if (colonIndex > 0) {
+            hostPart = hostPart.substring(0, colonIndex);
+        }
+
+        return hostPart;
+    }
+
+    private int parseHttpPort(String portStr) {
+        if (portStr == null || portStr.isBlank()) {
+            return 8030;
+        }
+        if (portStr.startsWith(":")) {
+            portStr = portStr.substring(1);
+        }
+        try {
+            return Integer.parseInt(portStr);
+        } catch (NumberFormatException e) {
+            return 8030; // default
+        }
+    }
+
+    /**
+     * Create HTTP client with connection pool and automatic redirect handling
+     * Replaces internal IPs in redirect URLs with external FE host for cloud 
deployments
+     */
+    private CloseableHttpClient createHttpClient() {
+        // Connection pool configuration
+        PoolingHttpClientConnectionManager connectionManager = new 
PoolingHttpClientConnectionManager();
+        connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
+        connectionManager.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
+
+        // Request configuration
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(CONNECT_TIMEOUT)
+                .setSocketTimeout(SOCKET_TIMEOUT)
+                .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+                .build();
+
+        // Custom redirect strategy to handle PUT requests and replace 
internal IPs
+        DefaultRedirectStrategy redirectStrategy = new 
DefaultRedirectStrategy() {
+            @Override
+            protected boolean isRedirectable(String method) {
+                // Enable redirect for PUT method (required for Doris Stream 
Load)
+                return true;
+            }
+
+            @Override
+            public HttpUriRequest getRedirect(
+                    HttpRequest request,
+                    HttpResponse response,
+                    HttpContext context) throws ProtocolException {
+                URI uri = getLocationURI(request, response, context);
+                String redirectLocation = uri.toString();
+
+                // Check if redirect points to localhost/internal IP
+                String host = uri.getHost();
+                if (host != null && ("127.0.0.1".equals(host) || 
"localhost".equals(host)
+                        || host.startsWith("192.168.") || 
host.startsWith("10.")
+                        || "0.0.0.0".equals(host))) {
+                    // Replace internal IP with external FE host, keep BE port 
and path
+                    int port = uri.getPort() > 0 ? uri.getPort() : 8040;
+                    String path = uri.getPath();
+                    String query = uri.getQuery();
+                    redirectLocation = "http://"; + feHost + ":" + port + path
+                            + (query != null ? "?" + query : "");
+                    log.debug("[Doris StreamLoad] Replaced internal IP {} with 
external {}:{}",
+                            host, feHost, port);
+                    try {
+                        uri = new URI(redirectLocation);
+                    } catch (URISyntaxException e) {
+                        log.warn("[Doris StreamLoad] Failed to build URI for 
redirect location", e);
+                    }
+                }
+
+                // Remove any embedded credentials like http://root:@
+                if (redirectLocation.contains("@")) {
+                    try {
+                        URIBuilder uriBuilder = new 
URIBuilder(redirectLocation);
+                        uriBuilder.setUserInfo(null);
+                        uri = uriBuilder.build();
+                    } catch (URISyntaxException e) {
+                        log.warn("[Doris StreamLoad] Failed to remove 
credentials from redirect URL", e);
+                    }
+                }
+
+                // Create new redirect request
+                HttpPut redirect = new HttpPut(uri);
+                if (request instanceof HttpEntityEnclosingRequest sourceRequest
+                        && sourceRequest.getEntity() != null) {
+                    redirect.setEntity(sourceRequest.getEntity());
+                }
+                // Copy headers from original request
+                copyHeaders(request, redirect);
+
+                return redirect;
+            }
+
+            private void copyHeaders(HttpRequest source, HttpUriRequest 
target) {
+                // Keep all business headers for redirected PUT request.
+                for (Header header : source.getAllHeaders()) {
+                    String name = header.getName();
+                    if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name)
+                            || HttpHeaders.HOST.equalsIgnoreCase(name)
+                            || 
HttpHeaders.TRANSFER_ENCODING.equalsIgnoreCase(name)) {
+                        continue;
+                    }
+                    target.setHeader(name, header.getValue());
+                }
+            }
+        };
+
+        return HttpClients.custom()
+                .setConnectionManager(connectionManager)
+                .setDefaultRequestConfig(requestConfig)
+                .setRedirectStrategy(redirectStrategy)
+                .build();
+    }
+
+    /**
+     * Build Basic Authentication header
+     */
+    private String basicAuthHeader(String username, String password) {
+        String toBeEncode = username + ":" + (password != null ? password : 
"");
+        byte[] encoded = 
Base64.encodeBase64(toBeEncode.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded, StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Build unique label for idempotency
+     */
+    private String buildLabel() {
+        return "hzb_" + System.currentTimeMillis() + "_" + 
transactionId.incrementAndGet();
+    }
+
+    /**
+     * Build Stream Load URL
+     */
+    private String buildLoadUrl() {
+        return String.format("http://%s:%d/api/%s/%s/_stream_load";,
+                feHost, feHttpPort, databaseName, tableName);
+    }
+
+    /**
+     * Write metrics data using Stream Load API
+     *
+     * @param rows list of metric rows to write
+     * @return true if write succeeded, false otherwise
+     */
+    public boolean write(List<DorisMetricRow> rows) {
+        if (!available || rows == null || rows.isEmpty()) {
+            return false;
+        }
+
+        return writeMetricWithAutoSplit(new ArrayList<>(rows));
+    }
+
+    public boolean writeLogs(List<LogEntry> logEntries) {
+        if (!available || logEntries == null || logEntries.isEmpty()) {
+            return false;
+        }
+        return writeLogWithAutoSplit(new ArrayList<>(logEntries));
+    }
+
+    private boolean writeMetricWithAutoSplit(List<DorisMetricRow> rows) {
+        List<StreamLoadMetricRow> streamLoadRows = toStreamLoadRows(rows);
+        String jsonData = JsonUtil.toJson(streamLoadRows);
+        if (jsonData == null) {
+            log.error("[Doris StreamLoad] Failed to serialize {} rows to 
JSON.", rows.size());
+            return false;
+        }
+
+        int payloadBytes = jsonData.getBytes(StandardCharsets.UTF_8).length;
+        int maxBytesPerBatch = config.maxBytesPerBatch();
+        if (payloadBytes > maxBytesPerBatch && rows.size() > 1) {
+            int mid = rows.size() / 2;
+            List<DorisMetricRow> left = new ArrayList<>(rows.subList(0, mid));
+            List<DorisMetricRow> right = new ArrayList<>(rows.subList(mid, 
rows.size()));
+            log.debug("[Doris StreamLoad] Split batch: rows={}, bytes={}, 
maxBytes={}",
+                    rows.size(), payloadBytes, maxBytesPerBatch);
+            return writeMetricWithAutoSplit(left) && 
writeMetricWithAutoSplit(right);
+        }
+
+        String label = buildLabel();
+        return writeSingleBatch(rows.size(), jsonData, label, 
METRIC_JSON_PATHS, METRIC_COLUMNS);
+    }
+
+    private boolean writeLogWithAutoSplit(List<LogEntry> logEntries) {
+        List<StreamLoadLogRow> streamLoadRows = 
toStreamLoadLogRows(logEntries);
+        String jsonData = JsonUtil.toJson(streamLoadRows);
+        if (jsonData == null) {
+            log.error("[Doris StreamLoad] Failed to serialize {} log entries 
to JSON.", logEntries.size());
+            return false;
+        }
+
+        int payloadBytes = jsonData.getBytes(StandardCharsets.UTF_8).length;
+        int maxBytesPerBatch = config.maxBytesPerBatch();
+        if (payloadBytes > maxBytesPerBatch && logEntries.size() > 1) {
+            int mid = logEntries.size() / 2;
+            List<LogEntry> left = new ArrayList<>(logEntries.subList(0, mid));
+            List<LogEntry> right = new ArrayList<>(logEntries.subList(mid, 
logEntries.size()));
+            log.debug("[Doris StreamLoad] Split log batch: rows={}, bytes={}, 
maxBytes={}",
+                    logEntries.size(), payloadBytes, maxBytesPerBatch);
+            return writeLogWithAutoSplit(left) && writeLogWithAutoSplit(right);
+        }
+
+        String label = buildLabel();
+        return writeSingleBatch(logEntries.size(), jsonData, label, 
LOG_JSON_PATHS, LOG_COLUMNS);
+    }
+
+    private List<StreamLoadMetricRow> toStreamLoadRows(List<DorisMetricRow> 
rows) {
+        List<StreamLoadMetricRow> result = new ArrayList<>(rows.size());
+        for (DorisMetricRow row : rows) {
+            StreamLoadMetricRow item = new StreamLoadMetricRow();
+            item.instance = row.instance;
+            item.app = row.app;
+            item.metrics = row.metrics;
+            item.metric = row.metric;
+            item.recordTime = formatRecordTime(row.recordTime);
+            item.metricType = row.metricType;
+            item.int32Value = row.int32Value;
+            item.doubleValue = row.doubleValue;
+            item.strValue = row.strValue;
+            item.labels = row.labels;
+            result.add(item);
+        }
+        return result;
+    }
+
+    private String formatRecordTime(Timestamp timestamp) {
+        if (timestamp == null) {
+            return null;
+        }
+        return 
DORIS_DATETIME_FORMATTER.format(timestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
+    }
+
+    private List<StreamLoadLogRow> toStreamLoadLogRows(List<LogEntry> 
logEntries) {
+        List<StreamLoadLogRow> result = new ArrayList<>(logEntries.size());
+        for (LogEntry logEntry : logEntries) {
+            long timeUnixNano = logEntry.getTimeUnixNano() != null
+                    ? logEntry.getTimeUnixNano()
+                    : System.currentTimeMillis() * 1_000_000L;
+            long observedTimeUnixNano = logEntry.getObservedTimeUnixNano() != 
null
+                    ? logEntry.getObservedTimeUnixNano()
+                    : timeUnixNano;
+
+            StreamLoadLogRow row = new StreamLoadLogRow();
+            row.timeUnixNano = timeUnixNano;
+            row.observedTimeUnixNano = observedTimeUnixNano;
+            row.eventTime = formatEpochNanos(timeUnixNano);
+            row.severityNumber = logEntry.getSeverityNumber();
+            row.severityText = logEntry.getSeverityText();
+            row.body = JsonUtil.toJson(logEntry.getBody());
+            row.traceId = logEntry.getTraceId();
+            row.spanId = logEntry.getSpanId();
+            row.traceFlags = logEntry.getTraceFlags();
+            row.attributes = JsonUtil.toJson(logEntry.getAttributes());
+            row.resource = JsonUtil.toJson(logEntry.getResource());
+            row.instrumentationScope = 
JsonUtil.toJson(logEntry.getInstrumentationScope());
+            row.droppedAttributesCount = logEntry.getDroppedAttributesCount();
+            result.add(row);
+        }
+        return result;
+    }
+
+    private String formatEpochNanos(long epochNanos) {
+        long millis = epochNanos / 1_000_000L;
+        return DORIS_DATETIME_FORMATTER.format(
+                
Instant.ofEpochMilli(millis).atZone(ZoneId.systemDefault()).toLocalDateTime());
+    }
+
+    private boolean writeSingleBatch(int rowCount, String jsonData, String 
label,
+                                     String jsonPaths, String columns) {
+        String loadUrl = buildLoadUrl();
+        int maxAttempts = Math.max(1, config.retryTimes() + 1);
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            LoadResult loadResult;
+            String responseBody = "";
+            int statusCode = -1;
+            String action = String.format("label=%s attempt=%d/%d rows=%d",
+                    label, attempt, maxAttempts, rowCount);
+
+            HttpPut httpPut = new HttpPut(loadUrl);
+            httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
+            httpPut.setHeader(HttpHeaders.AUTHORIZATION, 
basicAuthHeader(username, password));
+            httpPut.setHeader("label", label);
+            httpPut.setHeader("format", "json");
+            httpPut.setHeader("strip_outer_array", "true");
+            httpPut.setHeader("timeout", String.valueOf(config.timeout()));
+            httpPut.setHeader("max_filter_ratio", 
String.valueOf(config.maxFilterRatio()));
+            httpPut.setHeader("strict_mode", 
String.valueOf(config.strictMode()));
+            httpPut.setHeader("jsonpaths", jsonPaths);
+            httpPut.setHeader("columns", columns);
+            setHeaderIfHasText(httpPut, "timezone", config.timezone());
+            setHeaderIfHasText(httpPut, "redirect-policy", 
config.redirectPolicy());
+            setHeaderIfHasText(httpPut, "group_commit", config.groupCommit());
+            if (config.sendBatchParallelism() > 0) {
+                httpPut.setHeader("send_batch_parallelism", 
String.valueOf(config.sendBatchParallelism()));
+            }
+            httpPut.setEntity(new StringEntity(jsonData, 
StandardCharsets.UTF_8));
+
+            try (CloseableHttpResponse response = httpClient.execute(httpPut)) 
{
+                statusCode = response.getStatusLine().getStatusCode();
+                if (response.getEntity() != null) {
+                    responseBody = EntityUtils.toString(response.getEntity());
+                }
+                loadResult = handleResponse(statusCode, responseBody, 
rowCount);
+            } catch (Exception e) {
+                log.warn("[Doris StreamLoad] Request error, {}: {}", action, 
e.getMessage());
+                loadResult = LoadResult.RETRYABLE_FAILURE;
+            }
+
+            if (loadResult == LoadResult.SUCCESS) {
+                return true;
+            }
+
+            if (loadResult == LoadResult.NON_RETRYABLE_FAILURE || attempt >= 
maxAttempts) {
+                log.error("[Doris StreamLoad] Batch failed, {} statusCode={} 
response={}",
+                        action, statusCode, responseBody);
+                return false;
+            }
+
+            try {
+                Thread.sleep(RETRY_BACKOFF_MS * attempt);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("[Doris StreamLoad] Retry interrupted, {}", action);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    private void setHeaderIfHasText(HttpPut httpPut, String name, String 
value) {
+        if (value != null && !value.isBlank()) {
+            httpPut.setHeader(name, value.trim());
+        }
+    }
+
+    /**
+     * Handle Stream Load response
+     *
+     * Note: statusCode 200 only indicates the BE service is OK, not that 
stream load succeeded.
+     * We must parse the response JSON to check the actual status.
+     */
+    private LoadResult handleResponse(int statusCode, String body, int 
rowCount) {
+        if (statusCode != 200) {
+            log.warn("[Doris StreamLoad] HTTP request returned status {}, 
response: {}",
+                    statusCode, body);
+            return statusCode >= 500 ? LoadResult.RETRYABLE_FAILURE : 
LoadResult.NON_RETRYABLE_FAILURE;
+        }
+
+        JsonNode jsonNode = JsonUtil.fromJson(body);
+        if (jsonNode == null || !jsonNode.has("Status")) {
+            log.error("[Doris StreamLoad] Invalid response body: {}", body);
+            return LoadResult.NON_RETRYABLE_FAILURE;
+        }
+
+        String status = jsonNode.get("Status").asText("");
+
+        if (STATUS_SUCCESS.equalsIgnoreCase(status)) {
+            log.info("[Doris StreamLoad] Successfully loaded {} rows", 
rowCount);
+            return LoadResult.SUCCESS;
+        }
+
+        if (STATUS_PUBLISH_TIMEOUT.equalsIgnoreCase(status)) {
+            log.warn("[Doris StreamLoad] Publish Timeout for {} rows, treated 
as success. Response: {}",
+                    rowCount, body);
+            return LoadResult.SUCCESS;
+        }
+
+        if (STATUS_LABEL_ALREADY_EXISTS.equalsIgnoreCase(status)) {
+            String existingStatus = jsonNode.has("ExistingJobStatus")
+                    ? jsonNode.get("ExistingJobStatus").asText("")
+                    : "";
+            if ("FINISHED".equalsIgnoreCase(existingStatus) || 
"VISIBLE".equalsIgnoreCase(existingStatus)) {
+                log.info("[Doris StreamLoad] Label exists and already finished 
for {} rows", rowCount);
+                return LoadResult.SUCCESS;
+            }
+
+            return LoadResult.RETRYABLE_FAILURE;
+        }
+
+        if (STATUS_FAIL.equalsIgnoreCase(status)) {
+            log.error("[Doris StreamLoad] Failed to load {} rows, status={}, 
response={}",
+                    rowCount, status, body);
+            return LoadResult.NON_RETRYABLE_FAILURE;
+        }
+
+        log.warn("[Doris StreamLoad] Unexpected status={} for {} rows, 
response={}",
+                status, rowCount, body);
+        return LoadResult.RETRYABLE_FAILURE;
+    }
+
+    /**
+     * Get current statistics
+     */
+    public String getStats() {
+        return String.format("transactions=%d, available=%s", 
transactionId.get(), available);
+    }
+
+    /**
+     * Close the HTTP client and release resources
+     */
+    public void close() {
+        available = false;
+        try {
+            if (httpClient != null) {
+                httpClient.close();
+                log.info("[Doris StreamLoad] HTTP client closed");
+            }
+        } catch (IOException e) {
+            log.error("[Doris StreamLoad] Failed to close HTTP client", e);
+        }
+    }
+}
diff --git 
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
 
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
new file mode 100644
index 0000000000..a7eff2b56e
--- /dev/null
+++ 
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorageTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link DorisDataStorage}.
+ */
+@ExtendWith(MockitoExtension.class)
+class DorisDataStorageTest {
+
+    private static final long NANOS_PER_MILLISECOND = 1_000_000L;
+
+    @Mock
+    private WarehouseWorkerPool workerPool;
+
+    @Test
+    void queryLogsWithPaginationShouldContainLimitAndOffsetClauses() throws 
Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+        when(context.resultSet().next()).thenReturn(false);
+
+        context.storage().queryLogsByMultipleConditionsWithPagination(
+                1000L, 2000L, null, null, null, null, null, 5, 20
+        );
+
+        ArgumentCaptor<String> sqlCaptor = 
ArgumentCaptor.forClass(String.class);
+        
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+        String sql = sqlCaptor.getValue();
+
+        assertThat(sql).contains("ORDER BY time_unix_nano DESC");
+        assertThat(sql).contains("LIMIT ?");
+        assertThat(sql).contains("OFFSET ?");
+    }
+
+    @Test
+    void queryLogsShouldBindAllConditionsWithConvertedTimeInOrder() throws 
Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+        when(context.resultSet().next()).thenReturn(false);
+
+        context.storage().queryLogsByMultipleConditions(
+                1000L, 2000L, "trace-1", "span-1", 9, "INFO", "error"
+        );
+
+        ArgumentCaptor<String> sqlCaptor = 
ArgumentCaptor.forClass(String.class);
+        
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+        String sql = sqlCaptor.getValue();
+
+        assertThat(sql).contains("WHERE");
+        assertThat(sql).contains("time_unix_nano >= ?");
+        assertThat(sql).contains("time_unix_nano <= ?");
+        assertThat(sql).contains("trace_id = ?");
+        assertThat(sql).contains("span_id = ?");
+        assertThat(sql).contains("severity_number = ?");
+        assertThat(sql).contains("severity_text = ?");
+        assertThat(sql).contains("body LIKE ?");
+        assertThat(sql).contains("ORDER BY time_unix_nano DESC");
+
+        verify(context.queryPreparedStatement()).setObject(1, 1000L * 
NANOS_PER_MILLISECOND);
+        verify(context.queryPreparedStatement()).setObject(2, 2000L * 
NANOS_PER_MILLISECOND);
+        verify(context.queryPreparedStatement()).setObject(3, "trace-1");
+        verify(context.queryPreparedStatement()).setObject(4, "span-1");
+        verify(context.queryPreparedStatement()).setObject(5, 9);
+        verify(context.queryPreparedStatement()).setObject(6, "INFO");
+        verify(context.queryPreparedStatement()).setObject(7, "%error%");
+    }
+
+    @Test
+    void queryLogsWithPaginationShouldNotAppendOffsetWhenOffsetIsZero() throws 
Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+        when(context.resultSet().next()).thenReturn(false);
+
+        context.storage().queryLogsByMultipleConditionsWithPagination(
+                null, null, null, null, null, null, null, 0, 20
+        );
+
+        ArgumentCaptor<String> sqlCaptor = 
ArgumentCaptor.forClass(String.class);
+        
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+        String sql = sqlCaptor.getValue();
+
+        assertThat(sql).contains("LIMIT ?");
+        assertThat(sql).doesNotContain("OFFSET ?");
+        verify(context.queryPreparedStatement()).setObject(1, 20);
+    }
+
+    @Test
+    void countLogsShouldReturnCountAndBindConditions() throws Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+        when(context.resultSet().next()).thenReturn(true);
+        when(context.resultSet().getLong("count")).thenReturn(5L);
+
+        long count = context.storage().countLogsByMultipleConditions(
+                1000L, null, "trace-1", null, null, null, null
+        );
+
+        assertThat(count).isEqualTo(5L);
+
+        ArgumentCaptor<String> sqlCaptor = 
ArgumentCaptor.forClass(String.class);
+        
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+        String sql = sqlCaptor.getValue();
+
+        assertThat(sql).contains("SELECT COUNT(*) AS count");
+        assertThat(sql).contains("time_unix_nano >= ?");
+        assertThat(sql).contains("trace_id = ?");
+        verify(context.queryPreparedStatement()).setObject(1, 1000L * 
NANOS_PER_MILLISECOND);
+        verify(context.queryPreparedStatement()).setObject(2, "trace-1");
+    }
+
+    @Test
+    void batchDeleteLogsShouldFilterNullValuesAndExecuteDelete() throws 
Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        when(context.queryPreparedStatement().executeUpdate()).thenReturn(2);
+
+        List<Long> timeUnixNanos = new ArrayList<>();
+        timeUnixNanos.add(111L);
+        timeUnixNanos.add(null);
+        timeUnixNanos.add(333L);
+
+        boolean deleted = context.storage().batchDeleteLogs(timeUnixNanos);
+
+        assertThat(deleted).isTrue();
+
+        ArgumentCaptor<String> sqlCaptor = 
ArgumentCaptor.forClass(String.class);
+        
verify(context.queryConnection()).prepareStatement(sqlCaptor.capture());
+        assertThat(sqlCaptor.getValue()).contains("time_unix_nano IN (?,?)");
+        verify(context.queryPreparedStatement()).setLong(1, 111L);
+        verify(context.queryPreparedStatement()).setLong(2, 333L);
+    }
+
+    @Test
+    void batchDeleteLogsShouldReturnFalseWhenAllValuesAreNull() throws 
Exception {
+        DorisDataStorage storage = 
createStorageContext(createProperties(false));
+
+        List<Long> timeUnixNanos = new ArrayList<>();
+        timeUnixNanos.add(null);
+        timeUnixNanos.add(null);
+
+        boolean deleted = storage.batchDeleteLogs(timeUnixNanos);
+
+        assertThat(deleted).isFalse();
+    }
+
+    @Test
+    void queryLogsShouldMapJsonColumnsToLogEntryFields() throws Exception {
+        QueryStorageContext context = 
createQueryStorageContext(createProperties(false));
+        
when(context.queryPreparedStatement().executeQuery()).thenReturn(context.resultSet());
+        ResultSet resultSet = context.resultSet();
+
+        when(resultSet.next()).thenReturn(true, false);
+        when(resultSet.getLong("time_unix_nano")).thenReturn(111L);
+        when(resultSet.getLong("observed_time_unix_nano")).thenReturn(222L);
+        when(resultSet.getInt("severity_number")).thenReturn(9);
+        when(resultSet.getString("severity_text")).thenReturn("INFO");
+        when(resultSet.getString("body")).thenReturn("{\"message\":\"ok\"}");
+        when(resultSet.getString("trace_id")).thenReturn("trace-1");
+        when(resultSet.getString("span_id")).thenReturn("span-1");
+        when(resultSet.getInt("trace_flags")).thenReturn(1);
+        when(resultSet.getString("attributes")).thenReturn("{\"k\":\"v\"}");
+        
when(resultSet.getString("resource")).thenReturn("{\"service\":\"warehouse\"}");
+        
when(resultSet.getString("instrumentation_scope")).thenReturn("{\"name\":\"scope\",\"version\":\"1.0.0\"}");
+        when(resultSet.getInt("dropped_attributes_count")).thenReturn(3);
+        when(resultSet.wasNull()).thenReturn(false, false, false);
+
+        List<LogEntry> entries = 
context.storage().queryLogsByMultipleConditions(
+                null, null, null, null, null, null, null
+        );
+
+        assertThat(entries).hasSize(1);
+        LogEntry entry = entries.get(0);
+        assertThat(entry.getTimeUnixNano()).isEqualTo(111L);
+        assertThat(entry.getObservedTimeUnixNano()).isEqualTo(222L);
+        assertThat(entry.getSeverityNumber()).isEqualTo(9);
+        assertThat(entry.getSeverityText()).isEqualTo("INFO");
+        assertThat(entry.getTraceId()).isEqualTo("trace-1");
+        assertThat(entry.getSpanId()).isEqualTo("span-1");
+        assertThat(entry.getBody()).isInstanceOf(Map.class);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> body = (Map<String, Object>) entry.getBody();
+        assertThat(body).containsEntry("message", "ok");
+        assertThat(entry.getAttributes()).containsEntry("k", "v");
+        assertThat(entry.getResource()).containsEntry("service", "warehouse");
+        assertThat(entry.getInstrumentationScope()).isNotNull();
+        
assertThat(entry.getInstrumentationScope().getName()).isEqualTo("scope");
+        
assertThat(entry.getInstrumentationScope().getVersion()).isEqualTo("1.0.0");
+    }
+
+    private QueryStorageContext createQueryStorageContext(DorisProperties 
properties) {
+        Connection initConnection = mock(Connection.class);
+        Statement initStatement = mock(Statement.class);
+        Connection tableConnection = mock(Connection.class);
+        Statement tableStatement = mock(Statement.class);
+        Connection queryConnection = mock(Connection.class);
+        PreparedStatement queryPreparedStatement = 
mock(PreparedStatement.class);
+        ResultSet resultSet = mock(ResultSet.class);
+        AtomicInteger dataSourceConnectionCalls = new AtomicInteger(0);
+        String baseUrl = properties.url();
+        String username = properties.username();
+        String password = properties.password();
+        try {
+            when(initConnection.createStatement()).thenReturn(initStatement);
+            when(tableConnection.createStatement()).thenReturn(tableStatement);
+            
when(queryConnection.prepareStatement(anyString())).thenReturn(queryPreparedStatement);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        try (MockedStatic<DriverManager> driverManagerMock = 
mockStatic(DriverManager.class);
+             MockedConstruction<HikariDataSource> 
hikariDataSourceMockedConstruction = mockConstruction(
+                     HikariDataSource.class,
+                     (mock, context) -> 
when(mock.getConnection()).thenAnswer(invocation -> {
+                         int call = 
dataSourceConnectionCalls.getAndIncrement();
+                         return call == 0 ? tableConnection : queryConnection;
+                     }))) {
+            driverManagerMock.when(() -> DriverManager.getConnection(baseUrl, 
username, password))
+                    .thenReturn(initConnection);
+            DorisDataStorage storage = new DorisDataStorage(properties, 
workerPool);
+            return new QueryStorageContext(storage, queryConnection, 
queryPreparedStatement, resultSet);
+        }
+    }
+
+    private DorisDataStorage createStorageContext(DorisProperties properties) {
+        Connection initConnection = mock(Connection.class);
+        Statement initStatement = mock(Statement.class);
+        Connection tableConnection = mock(Connection.class);
+        Statement tableStatement = mock(Statement.class);
+        String baseUrl = properties.url();
+        String username = properties.username();
+        String password = properties.password();
+        try {
+            when(initConnection.createStatement()).thenReturn(initStatement);
+            when(tableConnection.createStatement()).thenReturn(tableStatement);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        try (MockedStatic<DriverManager> driverManagerMock = 
mockStatic(DriverManager.class);
+             MockedConstruction<HikariDataSource> 
hikariDataSourceMockedConstruction = mockConstruction(
+                     HikariDataSource.class, (mock, context) -> 
when(mock.getConnection()).thenReturn(tableConnection))) {
+            driverManagerMock.when(() -> DriverManager.getConnection(baseUrl, 
username, password))
+                    .thenReturn(initConnection);
+            return new DorisDataStorage(properties, workerPool);
+        }
+    }
+
+    private DorisProperties createProperties(boolean enablePartition) {
+        DorisProperties.TableConfig tableConfig = new 
DorisProperties.TableConfig(
+                enablePartition, "HOUR", 2, 1, 12, 1, 4096
+        );
+        DorisProperties.PoolConfig poolConfig = new DorisProperties.PoolConfig(
+                1, 2, 500, 0, 60_000
+        );
+        DorisProperties.WriteConfig writeConfig = new 
DorisProperties.WriteConfig(
+                "jdbc", 1000, 5, false, 
DorisProperties.StreamLoadConfig.createDefault()
+        );
+        return new DorisProperties(
+                true,
+                "jdbc:mysql://127.0.0.1:9030/hertzbeat",
+                "root",
+                "123456",
+                tableConfig,
+                poolConfig,
+                writeConfig
+        );
+    }
+
+    private record QueryStorageContext(DorisDataStorage storage, Connection 
queryConnection,
+                                       PreparedStatement 
queryPreparedStatement, ResultSet resultSet) {
+    }
+}
diff --git a/home/docs/start/doris-init.md b/home/docs/start/doris-init.md
new file mode 100644
index 0000000000..78b47732d8
--- /dev/null
+++ b/home/docs/start/doris-init.md
@@ -0,0 +1,222 @@
+---
+id: doris-init
+title: Use Apache Doris to Store Metrics and Logs Data (Optional)
+sidebar_label: Metrics/Logs Store Doris
+---
+
+Apache HertzBeat's historical data storage relies on the time series database, 
you can choose one of them to install and initialize, or not to install (note 
⚠️ but it is strongly recommended to configure in the production environment)
+
+> It is recommended to use Greptime as metrics storage.
+
+Apache Doris is an MPP-based real-time analytics database. In HertzBeat, Doris 
can be used to store:
+
+- Metrics history data (`hzb_history`)
+- Log data (`hzb_log`)
+
+**⚠️ If you do not configure a time-series database, only the last hour of 
historical data is retained.**
+
+> If you already have a Doris cluster, skip directly to the YML configuration 
section.
+
+### Install Doris (Optional)
+
+You can deploy Doris by package or Docker. For production, follow the official 
deployment guide:
+
+- Doris docs: [Quick 
Start](https://doris.apache.org/docs/4.x/gettingStarted/quick-start/)
+
+For HertzBeat integration, ensure at least:
+
+- FE MySQL service port is reachable (default `9030`)
+- FE HTTP service port is reachable (default `8030`)
+
+### Note: Install MySQL JDBC Driver Jar
+
+- Download MySQL JDBC driver jar, for example mysql-connector-java-8.1.0.jar. 
[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0](https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0)
+- Copy this jar to the `ext-lib` directory in HertzBeat installation directory.
+- Restart HertzBeat service.
+
+### Prerequisites
+
+1. Doris FE and BE are running normally.
+2. HertzBeat can access:
+   - FE MySQL port (default `9030`) for metadata/query
+   - FE HTTP port (default `8030`) for Stream Load
+3. The configured Doris user has permission to create database/table and 
insert/query data.
+
+### Configure Doris in HertzBeat `application.yml`
+
+1. Edit `hertzbeat/config/application.yml`.
+
+   For Docker deployment, mount the config file from host.
+   For package deployment, modify `hertzbeat/config/application.yml` directly.
+
+2. Configure `warehouse.store.doris` (Production Environment Recommended using 
Stream Load Mode):
+
+```yaml
+warehouse:
+  store:
+    doris:
+      enabled: true
+      # FE MySQL endpoint
+      url: jdbc:mysql://127.0.0.1:9030
+      username: root
+      password:
+
+      table-config:
+        # Enable dynamic partition for automatic expiration
+        enable-partition: true
+        # HOUR / DAY / MONTH
+        partition-time-unit: DAY
+        # Number of history partitions to keep
+        partition-retention-days: 30
+        # Number of future partitions to pre-create
+        partition-future-days: 3
+        buckets: 8
+        replication-num: 3
+
+      pool-config:
+        minimum-idle: 5
+        maximum-pool-size: 20
+        connection-timeout: 30000
+
+      write-config:
+        # Strongly recommend stream mode in production for high throughput
+        write-mode: stream
+        batch-size: 1000
+        flush-interval: 5
+        stream-load-config:
+          # FE HTTP port
+          http-port: ":8030"
+          timeout: 60
+          max-bytes-per-batch: 10485760
+          # For complex networks (K8s/cross-domain): direct / public / private
+          redirect-policy: ""
+```
+
+### Switching to Stream Load Mode
+
+#### Production Environment Configuration
+
+For production deployments, **strongly recommend using Stream Load mode** to 
ensure high-performance large-scale writes. Stream Load writes directly to 
Doris storage layer, providing better throughput improvement compared to JDBC 
mode.
+
+#### Pre-Switch Checklist
+
+1. **Network Reachability**
+   - Ensure HertzBeat can access Doris FE HTTP port (default `8030`)
+   - If direct connection is not possible, configure BE endpoint labels in 
Doris
+
+2. **Special Configuration for Complex Network Scenarios**
+   
+   In K8s, cross-domain, or load-balanced environments, Stream Load's redirect 
mechanism requires special attention:
+   - FE redirects requests to an available BE, which must be reachable from 
HertzBeat
+   - Control returned BE address type via `redirect-policy`:
+     - `direct`: Direct BE IP connection
+     - `public`: Use public IP (cloud environments)
+     - `private`: Use private IP (private networks)
+     - Leave empty to use Doris default policy
+   
+   Reference: [Doris Stream Load in Complex 
Networks](https://doris.apache.org/zh-CN/docs/4.x/data-operate/import/load-internals/stream-load-in-complex-network)
+
+#### Switching Steps
+
+1. **Modify Configuration File**
+   
+   Edit `hertzbeat/config/application.yml` and change `write-mode` to `stream`:
+   ```yaml
+   warehouse:
+     store:
+       doris:
+         write-config:
+           write-mode: stream  # Change here: from jdbc to stream
+           stream-load-config:
+             http-port: ":8030"
+             timeout: 60
+             max-bytes-per-batch: 10485760
+             redirect-policy: ""  # Configure if complex network
+   ```
+
+2. **Restart HertzBeat Service**
+
+3. **Verify Successful Switch**
+
+Check HertzBeat logs for Stream Load messages
+
+#### Common Switching Questions
+
+**Q: Do I need to rebuild tables after switching?**
+
+A: No. Stream Load and JDBC modes use the same table structure, fully 
compatible.
+
+**Q: Will data be lost when switching from JDBC to Stream Load?**
+
+A: No. Both write modes are independent, historical data remains unchanged.
+
+**Q: How do I rollback if Stream Load fails?**
+
+A: If the stream processing fails, it will automatically try to use the jdbc 
mode for fallback writing
+
+**Q: Still getting timeouts in cross-network setup with redirect-policy 
configured?**
+
+A: Possible causes:
+- Returned BE address under current `redirect-policy` setting is unreachable
+- Try different `redirect-policy` values (`direct` / `public` / `private`)
+- Contact Doris admin to verify BE endpoint label configuration
+
+### Parameter Notes
+
+| Parameter | Description |
+| --- | --- |
+| `enabled` | Enable/disable Doris storage |
+| `url` | Doris FE MySQL JDBC endpoint |
+| `table-config.enable-partition` | Enable dynamic partition and automatic 
expiration |
+| `table-config.partition-time-unit` | Partition granularity: `HOUR` / `DAY` / 
`MONTH` |
+| `table-config.partition-retention-days` | Number of partitions retained |
+| `table-config.partition-future-days` | Number of future partitions 
pre-created |
+| `table-config.buckets` | Bucket count for table distribution |
+| `table-config.replication-num` | Replica count |
+| `write-config.write-mode` | `jdbc` or `stream` |
+| `write-config.batch-size` | Write batch size |
+| `write-config.flush-interval` | Flush interval in seconds |
+| `stream-load-config.http-port` | Doris FE HTTP port for Stream Load |
+| `stream-load-config.timeout` | Stream Load timeout in seconds |
+| `stream-load-config.max-bytes-per-batch` | Max bytes per stream-load batch |
+| `stream-load-config.redirect-policy` | Redirect policy for FE->BE endpoint 
selection: `direct` / `public` / `private` |
+
+### Restart HertzBeat
+
+After configuration changes, restart HertzBeat to apply Doris storage settings.
+
+### Verify Doris Storage Is Working
+
+1. Check HertzBeat logs for Stream Load success messages.
+2. Verify database and tables:
+
+```sql
+SHOW CREATE TABLE hertzbeat.hzb_history;
+SHOW CREATE TABLE hertzbeat.hzb_log;
+```
+
+3. If partition is enabled, check dynamic partition state:
+
+```sql
+SHOW DYNAMIC PARTITION TABLES FROM hertzbeat;
+SHOW PARTITIONS FROM hertzbeat.hzb_history;
+SHOW PARTITIONS FROM hertzbeat.hzb_log;
+```
+
+### FAQ
+
+1. Do I need to enable partition to use bucket distribution?
+
+   > No. Buckets work with or without dynamic partition. `enable-partition` 
only controls dynamic partition and automatic expiration.
+
+2. Can I use Doris for both metrics and logs at the same time?
+
+   > Yes. HertzBeat writes metrics into `hzb_history` and logs into `hzb_log` 
with the same Doris datasource configuration.
+
+3. If I change partition/bucket settings in `application.yml`, will existing 
tables auto-update?
+
+   > No. Existing Doris table DDL is not automatically altered. For 
schema-level changes, apply DDL manually or recreate tables.
+
+4. Is stream load compression enabled?
+
+   > Current implementation uses JSON stream load by default.
diff --git 
a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md 
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md
new file mode 100644
index 0000000000..9c2c072ef9
--- /dev/null
+++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md
@@ -0,0 +1,221 @@
+---
+id: doris-init
+title: 依赖时序数据库服务 Doris 安装初始化(可选)
+sidebar_label: 指标/日志数据存储 Doris
+---
+
+Apache HertzBeat™ 的历史数据存储依赖时序数据库,任选其一安装初始化即可,也可不安装(注意⚠️但强烈建议生产环境配置)。
+
+> 我们推荐使用并长期支持 Greptime 作为存储。
+
+Apache Doris 是一款面向实时分析场景的 MPP 数据库。在 HertzBeat 中,Doris 可用于同时存储:
+
+- 指标历史数据(`hzb_history`)
+- 日志数据(`hzb_log`)
+
+**⚠️ 若不配置时序数据库,则只会保留最近一小时历史数据。**
+
+> 如果您已有 Doris 环境,可直接跳到 YML 配置步骤。
+
+### 安装 Doris(可选)
+
+你可以通过安装包或 Docker 部署 Doris。生产环境建议参考官方部署文档:
+
+- Doris 官方文档:[Quick 
Start](https://doris.apache.org/docs/4.x/gettingStarted/quick-start/)
+
+对 HertzBeat 接入来说,至少需要保证:
+
+- FE MySQL 服务端口可访问(默认 `9030`)
+- FE HTTP 服务端口可访问(默认 `8030`)
+
+### 注意,必须添加 MYSQL jdbc 驱动 jar
+
+- 下载 MYSQL jdbc driver jar,例如 
mysql-connector-java-8.1.0.jar。[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0](https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0)
+- 将此 jar 包拷贝放入 HertzBeat 的安装目录下的 `ext-lib` 目录下。
+- 重启 HertzBeat 服务。
+
+### 前置检查
+
+1. Doris FE、BE 节点状态正常。
+2. HertzBeat 到 Doris 网络可达:
+   - FE MySQL 端口(默认 `9030`),用于建库建表、查询
+   - FE HTTP 端口(默认 `8030`),用于 Stream Load 写入
+3. 配置的 Doris 用户具有建库建表、写入、查询权限。
+
+### 在 HertzBeat `application.yml` 中配置 Doris
+
+1. 修改 `hertzbeat/config/application.yml`。
+
+   Docker 容器部署需将配置文件挂载到主机本地,安装包部署直接修改解压目录下配置文件即可。
+
+2. 配置 `warehouse.store.doris`(生产环境推荐配置使用 Stream Load 模式):
+
+```yaml
+warehouse:
+  store:
+    doris:
+      enabled: true
+      # Doris FE MySQL 连接地址
+      url: jdbc:mysql://127.0.0.1:9030
+      username: root
+      password:
+
+      table-config:
+        # 开启动态分区(用于自动过期)
+        enable-partition: true
+        # 支持 HOUR / DAY / MONTH
+        partition-time-unit: DAY
+        # 历史分区保留数量
+        partition-retention-days: 30
+        # 预创建未来分区数量
+        partition-future-days: 3
+        buckets: 8
+        replication-num: 3
+
+      pool-config:
+        minimum-idle: 5
+        maximum-pool-size: 20
+        connection-timeout: 30000
+
+      write-config:
+        # 生产环境强烈推荐使用 stream 模式以获得高吞吐性能
+        write-mode: stream
+        batch-size: 1000
+        flush-interval: 5
+        stream-load-config:
+          # Doris FE HTTP 端口
+          http-port: ":8030"
+          timeout: 60
+          max-bytes-per-batch: 10485760
+          # 复杂网络环境(K8s/跨域)需配置:direct / public / private
+          redirect-policy: ""
+```
+
+### 切换到 Stream Load 模式
+
+#### 生产环境推荐配置
+
+在生产环境部署时,**强烈推荐使用 Stream Load 模式** 以保证大规模写入性能。Stream Load 直接写入 Doris 存储层,相比 
JDBC 模式提供更高的吞吐量提升。
+
+#### 切换前的前置检查
+
+1. **网络可达性**
+   - 确保 HertzBeat 能访问 Doris FE HTTP 端口(默认 `8030`)
+   - 若无法直连,需在 Doris 侧配置 BE 公网/内网地址标签
+
+2. **复杂网络场景的特殊配置**
+   
+   在 K8s、跨域、负载均衡等环境下,Stream Load 的重定向机制需要特别注意:
+   - FE 会将请求重定向到某个可用的 BE,该 BE 地址必须对 HertzBeat 可达
+   - 通过配置 `redirect-policy` 来控制 FE 返回的 BE 地址类型:
+     - `direct`:直连 BE IP
+     - `public`:使用公网地址(云环境)
+     - `private`:使用内网地址(私有网络)
+     - 留空则使用 Doris 默认策略
+   
+   参考官方文档:[Doris Stream Load 
复杂网络原理](https://doris.apache.org/zh-CN/docs/4.x/data-operate/import/load-internals/stream-load-in-complex-network)
+
+#### 切换步骤
+
+1. **修改配置文件**
+   
+   编辑 `hertzbeat/config/application.yml`,将 `write-mode` 改为 `stream`:
+   ```yaml
+   warehouse:
+     store:
+       doris:
+         write-config:
+           write-mode: stream  # 改这里:从 jdbc 改为 stream
+           stream-load-config:
+             http-port: ":8030"
+             timeout: 60
+             max-bytes-per-batch: 10485760
+             redirect-policy: ""  # 复杂网络需配置这里
+   ```
+
+2. **重启 HertzBeat 服务**
+
+3. **验证切换成功**
+
+查看 HertzBeat 日志,应出现 Stream Load 相关日志
+
+#### 常见切换问题
+
+**Q: 切换后需要重建表吗?**
+
+A: 不需要。Stream Load 和 JDBC 模式使用同一套表结构,数据完全兼容。
+
+**Q: 从 JDBC 切换到 Stream Load 会丢数据吗?**
+
+A: 不会。两种模式的写入是独立的,历史数据保持不变。
+
+**Q: Stream Load 失败了怎么回退?**
+
+A: 如果流处理失败了会自动尝试使用jdbc模式进行回退写入
+
+**Q: 跨网络环境配置了 redirect-policy 仍然超时?**
+
+A: 可能原因:
+- 在当前 `redirect-policy` 设置下,返回的 BE 地址仍不可达
+- 尝试其他 `redirect-policy` 值(`direct` / `public` / `private`)
+- 联系 Doris 管理员确认 BE 节点的地址标签配置是否正确
+
+### 参数说明
+
+| 参数 | 说明 |
+| --- | --- |
+| `enabled` | 是否启用 Doris 存储 |
+| `url` | Doris FE MySQL JDBC 地址 |
+| `table-config.enable-partition` | 是否启用动态分区与自动过期 |
+| `table-config.partition-time-unit` | 分区时间粒度:`HOUR` / `DAY` / `MONTH` |
+| `table-config.partition-retention-days` | 历史分区保留数量 |
+| `table-config.partition-future-days` | 未来分区预创建数量 |
+| `table-config.buckets` | 分桶数量 |
+| `table-config.replication-num` | 副本数量 |
+| `write-config.write-mode` | 写入模式:`jdbc` 或 `stream` |
+| `write-config.batch-size` | 单批写入大小 |
+| `write-config.flush-interval` | 刷新间隔(秒) |
+| `stream-load-config.http-port` | Stream Load 使用的 FE HTTP 端口 |
+| `stream-load-config.timeout` | Stream Load 超时时间(秒) |
+| `stream-load-config.max-bytes-per-batch` | 单批最大字节数 |
+| `stream-load-config.redirect-policy` | FE->BE 地址返回策略:`direct` / `public` / 
`private` |
+
+### 重启 HertzBeat
+
+完成配置后,重启 HertzBeat 使配置生效。
+
+### 验证 Doris 存储是否生效
+
+1. 查看 HertzBeat 日志,确认出现 Stream Load 成功日志。
+2. 在 Doris 中检查建表是否完成:
+
+```sql
+SHOW CREATE TABLE hertzbeat.hzb_history;
+SHOW CREATE TABLE hertzbeat.hzb_log;
+```
+
+3. 若启用了动态分区,检查分区调度状态:
+
+```sql
+SHOW DYNAMIC PARTITION TABLES FROM hertzbeat;
+SHOW PARTITIONS FROM hertzbeat.hzb_history;
+SHOW PARTITIONS FROM hertzbeat.hzb_log;
+```
+
+### 常见问题
+
+1. 不开启分区还能分桶吗?
+
+   > 可以。分桶与是否开启动态分区无强依赖,`enable-partition` 主要影响动态分区和自动过期能力。
+
+2. Doris 能否同时存储指标和日志?
+
+   > 可以。HertzBeat 会将指标写入 `hzb_history`,日志写入 `hzb_log`,共用同一 Doris 数据源配置。
+
+3. 修改 `application.yml` 的分区/分桶参数后,旧表会自动更新吗?
+
+   > 不会。已存在表的 DDL 不会自动变更。需要手动执行 DDL 或重建表。
+
+4. 当前是否启用了 Stream Load 压缩?
+
+   > 当前实现默认使用 JSON Stream Load。


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to