This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/doris by this push:
     new b11cea90c ```fix(warehouse): update Doris database name to 'hertzbeat'
b11cea90c is described below

commit b11cea90c79ba9b75e114bbb59372f7a5bda452e
Author: zqr10159 <[email protected]>
AuthorDate: Sun Aug 25 20:10:21 2024 +0800

    ```fix(warehouse): update Doris database name to 'hertzbeat'
    
    Change the default database name in DorisDataStorage and DorisProperties 
from
    _internal_schema to hertzbeat to ensure metrics data is stored in the 
correct database.
    This aligns with the intended configuration for history data storage in 
Doris.
    ```
---
 manager/src/main/resources/application.yml         |   2 +-
 .../store/history/doris/DorisDataStorage.java      | 115 +++++++++++++++++++++
 .../store/history/doris/DorisProperties.java       |   3 +-
 .../warehouse/utils/DorisStreamLoadUtil.java       |   6 +-
 4 files changed, 122 insertions(+), 4 deletions(-)

diff --git a/manager/src/main/resources/application.yml 
b/manager/src/main/resources/application.yml
index 1902ae6ec..10f389150 100644
--- a/manager/src/main/resources/application.yml
+++ b/manager/src/main/resources/application.yml
@@ -171,7 +171,7 @@ warehouse:
       httpPort: 8030
       username: root
       password:
-      db: _internal_schema
+      database: hertzbeat
       table: hzb_history
       expire-time: '30d'
   # store real-time metrics data, enable only one below
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 3a92892de..c732247c3 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -18,12 +18,19 @@
 
 package org.apache.hertzbeat.warehouse.store.history.doris;
 
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.Value;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
+import org.apache.hertzbeat.warehouse.utils.DorisStreamLoadUtil;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
@@ -31,6 +38,59 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled", 
havingValue = "true")
 @Slf4j
 public class DorisDataStorage extends AbstractHistoryDataStorage {
+
+    private HikariDataSource hikariDataSource;
+
+    @Autowired
+    private DorisProperties dorisProperties;
+
+    public DorisDataStorage(DorisProperties dorisProperties) {
+        if (!dorisProperties.enabled()) {
+            return;
+        }
+        this.hikariDataSource = new HikariDataSource();
+        this.hikariDataSource.setJdbcUrl("jdbc:mysql://" + 
dorisProperties.host() + ":" + dorisProperties.jdbcPort() + "/" + 
dorisProperties.database());
+        this.hikariDataSource.setUsername(dorisProperties.username());
+        this.hikariDataSource.setPassword(dorisProperties.password());
+        this.hikariDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
+        // minimum number of idle connection
+        this.hikariDataSource.setMinimumIdle(10);
+        // maximum number of connection in the pool
+        this.hikariDataSource.setMaximumPoolSize(10);
+        // maximum wait milliseconds for get connection from pool
+        this.hikariDataSource.setConnectionTimeout(30000);
+        // maximum lifetime for each connection
+        this.hikariDataSource.setMaxLifetime(0);
+        // max idle time for recycle idle connection
+        this.hikariDataSource.setIdleTimeout(0);
+        // validation query
+        this.hikariDataSource.setConnectionTestQuery("select 1");
+        createDatabase(dorisProperties.database());
+
+    }
+
+    private void createDatabase(String database) {
+        try (Connection connection = hikariDataSource.getConnection()) {
+            connection.createStatement().execute("CREATE DATABASE IF NOT 
EXISTS " + database);
+        } catch (SQLException e) {
+            log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+        }
+    }
+
+    private void createTable(String database, String tableName) {
+        try (Connection connection = hikariDataSource.getConnection()) {
+            connection.createStatement().execute("USE " + database);
+            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS " 
+ tableName);
+        } catch (SQLException e) {
+            log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+        }
+    }
+
+    private String getTableName(String app, String metrics) {
+        return app + "_" + metrics;
+    }
+
+
     /**
      * query history range metrics data from doris
      *
@@ -71,9 +131,64 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
      */
     @Override
     public void saveData(CollectRep.MetricsData metricsData) {
+        if (!isServerAvailable() || metricsData.getCode() != 
CollectRep.Code.SUCCESS) {
+            return;
+        }
+        if (metricsData.getValuesList().isEmpty()) {
+            log.info("[warehouse doris] flush metrics data {} is null, 
ignore.", metricsData.getId());
+            return;
+        }
+        String monitorId = String.valueOf(metricsData.getId());
+        String tableName = getTableName(metricsData.getApp(), 
metricsData.getMetrics());
+        createTable(dorisProperties.database(), tableName);
+        List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
+        StringBuilder jsonData = new StringBuilder();
+        jsonData.append("[");
+
+        for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
+            StringBuilder row = new StringBuilder();
+            row.append("{");
+            row.append("\"monitor_id\":\"").append(monitorId).append("\",");
+            for (int i = 0; i < fieldsList.size(); i++) {
+                CollectRep.Field field = fieldsList.get(i);
+                String value = valueRow.getColumns(i);
+                if (!CommonConstants.NULL_VALUE.equals(value)) {
+                    if (field.getType() == CommonConstants.TYPE_NUMBER) {
+                        
row.append("\"").append(field.getName()).append("\":").append(Double.parseDouble(value)).append(",");
+                    } else if (field.getType() == CommonConstants.TYPE_STRING) 
{
+                        
row.append("\"").append(field.getName()).append("\":\"").append(value).append("\",");
+                    }
+                }
+            }
+            row.append("\"ts\":").append(System.currentTimeMillis());
+            row.append("},");
+            jsonData.append(row);
+        }
 
+        // Remove the trailing comma and close the JSON array
+        if (jsonData.charAt(jsonData.length() - 1) == ',') {
+            jsonData.setCharAt(jsonData.length() - 1, ']');
+        } else {
+            jsonData.append(']');
+        }
+
+        try {
+            DorisStreamLoadUtil.sendData(
+                    dorisProperties.host(),
+                    dorisProperties.httpPort(),
+                    dorisProperties.username(),
+                    dorisProperties.password(),
+                    dorisProperties.database(),
+                    tableName,
+                    jsonData.toString()
+            );
+            log.debug("[warehouse doris]-Write successful");
+        } catch (Exception e) {
+            log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+        }
     }
 
+
     @Override
     public void destroy() throws Exception {
 
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
index b3118ceb6..bd9a95356 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
@@ -32,9 +32,10 @@ import 
org.springframework.boot.context.properties.bind.DefaultValue;
 public record DorisProperties(@DefaultValue("false") boolean enabled,
                               @DefaultValue("127.0.0.1") String host,
                               @DefaultValue("8030") int httpPort,
+                              @DefaultValue("9030") int jdbcPort,
                               @DefaultValue("root") String username,
                               @DefaultValue("") String password,
-                              @DefaultValue("_internal_schema") String db,
+                              @DefaultValue("hertzbeat") String database,
                               @DefaultValue("hzb_history") String table,
                               // Database TTL, default is 30 days.
                               @DefaultValue("30d") String expireTime) {
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
index 464569c28..125855976 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
@@ -19,6 +19,7 @@ package org.apache.hertzbeat.warehouse.utils;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpHeaders;
@@ -32,19 +33,20 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 
 @Slf4j
+@Data
 public class DorisStreamLoadUtil {
 
     public static void sendData(String host,
                                 int httpPort,
                                 String username,
                                 String password,
-                                String db,
+                                String database,
                                 String table,
                                 String content) throws Exception {
         final String loadUrl = 
String.format("http://%s:%s/api/%s/%s/_stream_load";,
                 host,
                 httpPort,
-                db,
+                database,
                 table);
 
         final HttpClientBuilder httpClientBuilder = HttpClients


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to