zqr10159 commented on code in PR #4031:
URL: https://github.com/apache/hertzbeat/pull/4031#discussion_r2845250104


##########
home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/doris-init.md:
##########
@@ -0,0 +1,160 @@
+---
+id: doris-init
+title: 依赖时序数据库服务 Doris 安装初始化(可选)
+sidebar_label: 指标/日志数据存储 Doris
+---
+
+Apache HertzBeat™ 的历史数据存储依赖时序数据库,任选其一安装初始化即可,也可不安装(注意⚠️但强烈建议生产环境配置)。
+
+> 我们推荐使用并长期支持 Greptime 作为存储。
+
+Apache Doris 是一款面向实时分析场景的 MPP 数据库。在 HertzBeat 中,Doris 可用于同时存储:
+
+- 指标历史数据(`hzb_history`)
+- 日志数据(`hzb_log`)
+
+**⚠️ 若不配置时序数据库,则只会保留最近一小时历史数据。**
+
+> 如果您已有 Doris 环境,可直接跳到 YML 配置步骤。
+
+### 安装 Doris(可选)
+
+你可以通过安装包或 Docker 部署 Doris。生产环境建议参考官方部署文档:
+
+- Doris 官方文档:[Quick 
Start](https://doris.apache.org/docs/4.x/gettingStarted/quick-start/)
+
+对 HertzBeat 接入来说,至少需要保证:
+
+- FE MySQL 服务端口可访问(默认 `9030`)
+- FE HTTP 服务端口可访问(默认 `8030`)
+
+### 前置检查

Review Comment:
   The final product of Hertzbeat will not include the MySQL-JDBC driver. 
Documentation must include instructions for MySQL-JDBC, referencing MySQL 
monitoring.



##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/doris/DorisDataStorage.java:
##########
@@ -0,0 +1,1250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.warehouse.store.history.tsdb.doris;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.constants.MetricDataConstants;
+import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
+import org.apache.hertzbeat.common.entity.dto.Value;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
+import 
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Apache Doris data storage.
+ * Supports two write modes:
+ * - jdbc: Traditional JDBC batch insert (default, suitable for small to 
medium scale)
+ * - stream: HTTP Stream Load API (high throughput, suitable for large scale)
+ */
+@Component
+@ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled", 
havingValue = "true")
+@Slf4j
+public class DorisDataStorage extends AbstractHistoryDataStorage {
+
+    private static final String DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
+    private static final String DATABASE_NAME = "hertzbeat";
+    private static final String TABLE_NAME = "hzb_history";
+    private static final String LOG_TABLE_NAME = "hzb_log";
+    private static final String WRITE_MODE_JDBC = "jdbc";
+    private static final String WRITE_MODE_STREAM = "stream";
+
+    /**
+     * Metric type constants
+     */
+    private static final byte METRIC_TYPE_NUMBER = 1;
+    private static final byte METRIC_TYPE_STRING = 2;
+    private static final byte METRIC_TYPE_TIME = 3;
+
+    private static final String LABEL_KEY_START_TIME = "start";
+    private static final String LABEL_KEY_END_TIME = "end";
+    private static final int MAX_QUERY_LIMIT = 20000;
+    private static final int LOG_BATCH_SIZE = 1000;
+    private static final long NANOS_PER_MILLISECOND = 1_000_000L;
+    private static final long MAX_WAIT_MS = 500L;
+    private static final int MAX_RETRIES = 3;
+    private static final String METRIC_BLOOM_FILTER_COLUMNS = 
"instance,app,metrics,metric";
+    private static final String LOG_BLOOM_FILTER_COLUMNS = 
"trace_id,span_id,severity_number,severity_text";
+
+    private final DorisProperties properties;
+    private HikariDataSource dataSource;
+
+    private final BlockingQueue<DorisMetricRow> metricsBufferQueue;
+    private final DorisProperties.WriteConfig writeConfig;
+    private final WarehouseWorkerPool warehouseWorkerPool;
+    private String writeMode;
+    private final AtomicBoolean draining = new AtomicBoolean(false);
+    private volatile boolean flushThreadRunning = true;
+    private volatile boolean flushTaskStarted;
+    private final CountDownLatch flushTaskStopped = new CountDownLatch(1);
+
+    // Stream Load writer (only used when writeMode is "stream")
+    private DorisStreamLoadWriter streamLoadWriter;
+    private DorisStreamLoadWriter logStreamLoadWriter;
+
+    public DorisDataStorage(DorisProperties dorisProperties, 
WarehouseWorkerPool warehouseWorkerPool) {
+        if (dorisProperties == null) {
+            log.error("[Doris] Init error, please config Warehouse Doris props 
in application.yml");
+            throw new IllegalArgumentException("please config Warehouse Doris 
props");
+        }
+        if (warehouseWorkerPool == null) {
+            throw new IllegalArgumentException("please config 
WarehouseWorkerPool bean");
+        }
+        this.properties = dorisProperties;
+        this.warehouseWorkerPool = warehouseWorkerPool;
+        this.writeConfig = dorisProperties.writeConfig();
+        this.writeMode = writeConfig.writeMode();
+        this.metricsBufferQueue = new 
LinkedBlockingQueue<>(writeConfig.batchSize() * 10);
+
+        serverAvailable = initDorisConnection();
+        if (serverAvailable) {
+            // Initialize Stream Load writer if using stream mode
+            if (WRITE_MODE_STREAM.equals(writeMode)) {
+                initStreamLoadWriter();
+            }
+            startFlushThread();
+        }
+
+        log.info("[Doris] Initialized with write mode: {}", writeMode);
+    }
+
+    /**
+     * Initialize Doris connection and table in one go.
+     */
+    private boolean initDorisConnection() {
+        try {
+            Class.forName(DRIVER_NAME);
+
+            DorisProperties.PoolConfig poolConfig = properties.poolConfig();
+            String baseUrl = properties.url();
+
+            // Step 1: First connect without database to create it if needed
+            try (Connection initConn = java.sql.DriverManager.getConnection(
+                    baseUrl, properties.username(), properties.password());
+                 Statement stmt = initConn.createStatement()) {
+                // Create database if not exists
+                stmt.execute("CREATE DATABASE IF NOT EXISTS " + DATABASE_NAME);
+                log.info("[Doris] Database {} ensured", DATABASE_NAME);
+            }
+
+            // Step 2: Build URL with database
+            String urlWithDb = baseUrl.endsWith("/") ? baseUrl + DATABASE_NAME 
: baseUrl + "/" + DATABASE_NAME;
+
+            // Step 3: Create connection pool with database
+            HikariConfig config = getHikariConfig(urlWithDb, poolConfig);
+
+            this.dataSource = new HikariDataSource(config);
+
+            // Step 4: Create table
+            try (Connection conn = dataSource.getConnection();
+                 Statement stmt = conn.createStatement()) {
+                stmt.execute(buildCreateTableSql());
+                log.info("[Doris] Table {} ensured", TABLE_NAME);
+                stmt.execute(buildCreateLogTableSql());
+                log.info("[Doris] Table {} ensured", LOG_TABLE_NAME);
+            }
+
+            log.info("[Doris] Initialized: {}", urlWithDb);
+            return true;
+        } catch (ClassNotFoundException e) {
+            log.error("[Doris] MySQL JDBC driver not found.", e);
+        } catch (Exception e) {
+            log.error("[Doris] Failed to initialize: {}", e.getMessage(), e);
+        }
+        return false;
+    }
+
+    @NotNull
+    private HikariConfig getHikariConfig(String urlWithDb, 
DorisProperties.PoolConfig poolConfig) {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl(urlWithDb);
+        config.setUsername(properties.username());
+        config.setPassword(properties.password());
+        config.setDriverClassName(DRIVER_NAME);
+        config.setMinimumIdle(poolConfig.minimumIdle());
+        config.setMaximumPoolSize(poolConfig.maximumPoolSize());
+        config.setConnectionTimeout(poolConfig.connectionTimeout());
+        config.setMaxLifetime(poolConfig.maxLifetime());
+        config.setIdleTimeout(poolConfig.idleTimeout());
+        config.setConnectionTestQuery("SELECT 1");
+        config.setPoolName("Doris-HikariCP");
+        return config;
+    }
+
+    /**
+     * Initialize Stream Load writer
+     */
+    private void initStreamLoadWriter() {
+        try {
+            this.streamLoadWriter = new DorisStreamLoadWriter(
+                    DATABASE_NAME, TABLE_NAME, properties.url(),
+                    properties.username(), properties.password(),
+                    writeConfig.streamLoadConfig()
+            );
+            if (streamLoadWriter.isAvailable()) {
+                log.info("[Doris] Stream Load writer initialized.");
+            } else {
+                log.warn("[Doris] Stream Load writer failed, fallback to 
JDBC.");
+                this.writeMode = WRITE_MODE_JDBC;
+            }
+
+            this.logStreamLoadWriter = new DorisStreamLoadWriter(
+                    DATABASE_NAME, LOG_TABLE_NAME, properties.url(),
+                    properties.username(), properties.password(),
+                    writeConfig.streamLoadConfig()
+            );
+            if (logStreamLoadWriter.isAvailable()) {
+                log.info("[Doris] Log Stream Load writer initialized.");
+            } else {
+                log.warn("[Doris] Log Stream Load writer unavailable, log 
writes fallback to JDBC.");
+                this.logStreamLoadWriter = null;
+            }
+        } catch (Exception e) {
+            log.error("[Doris] Stream Load init failed: {}", e.getMessage(), 
e);
+            this.writeMode = WRITE_MODE_JDBC;
+        }
+    }
+
+    /**
+     * Build CREATE TABLE SQL statement
+     */
+    private String buildCreateTableSql() {
+        DorisProperties.TableConfig config = properties.tableConfig();
+
+        StringBuilder sql = new StringBuilder();

Review Comment:
   Consider using template strings instead of StringBuilder? The former is 
easier to maintain.



##########
hertzbeat-startup/src/main/resources/application.yml:
##########
@@ -225,6 +225,38 @@ warehouse:
       password: root
       expire-time: '30d'
       replication: 1
+    doris:
+      enabled: false
+      url: jdbc:mysql://127.0.0.1:9030
+      username: root
+      password:
+      table-config:
+        enable-partition: false
+        partition-time-unit: DAY
+        partition-retention-days: 30
+        partition-future-days: 3
+        buckets: 8
+        replication-num: 1
+      pool-config:
+        minimum-idle: 5
+        maximum-pool-size: 20
+        connection-timeout: 30000
+      write-config:
+        # Write mode: jdbc (default, suitable for small/medium scale) or 
stream (high throughput)
+        write-mode: jdbc

Review Comment:
   It seems streamload has the highest priority in the code. If streamload is 
unavailable, it falls back to JDBC mode. So why is JDBC the default here? If 
users are unfamiliar with Doris's write capabilities, the default JDBC mode can 
impact performance during large-scale writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to