This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/doris by this push:
     new 68281b01c ```feat(doris): implement dynamic partitioning and 
standardized table naming
68281b01c is described below

commit 68281b01c62bc7e9685e99a0b3032fc1e30e6b56
Author: zqr10159 <[email protected]>
AuthorDate: Mon Aug 26 11:03:55 2024 +0800

    ```feat(doris): implement dynamic partitioning and standardized table naming
    
    - Enable dynamic partitioning in DorisDataStorage with a configurable start 
parameter.
    - Standardize table naming convention by prefixing table names with 
'hzb_'.- Refactor JSON data formatting to include timestamp in Doris data 
ingestion.
    - Log Doris stream load result for better monitoring and troubleshooting.
    ```
---
 .../store/history/doris/DorisDataStorage.java      | 39 ++++++++++++++--------
 .../warehouse/utils/DorisStreamLoadUtil.java       |  3 +-
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 27637ba25..9566bdf79 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -21,10 +21,12 @@ package org.apache.hertzbeat.warehouse.store.history.doris;
 import com.zaxxer.hikari.HikariDataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.HashMap;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.Value;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
@@ -53,7 +55,7 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
             "dynamic_partition.enable" = "true",
             "dynamic_partition.create_history_partition" = "true",
             "dynamic_partition.time_unit" = "DAY",
-            "dynamic_partition.start" = "-30",
+            "dynamic_partition.start" = "-%s",
             "dynamic_partition.end" = "1",
             "dynamic_partition.prefix" = "p",
             "dynamic_partition.buckets" = "16",
@@ -107,14 +109,15 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
     private void createTable(String tableName) {
         try (Connection connection = hikariDataSource.getConnection()) {
             connection.createStatement().execute("USE " + 
dorisProperties.database());
-            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS " 
+ tableName);
+            String createTableSql = String.format(CREATE_TABLE_SQL, tableName, 
dorisProperties.expireTime());
+            connection.createStatement().execute(createTableSql);
         } catch (SQLException e) {
             log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
         }
     }
 
     private String getTableName(String app, String metrics) {
-        return app + "_" + metrics;
+        return "hzb_" + app + "_" + metrics;
     }
 
 
@@ -167,14 +170,13 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
         }
         String monitorId = String.valueOf(metricsData.getId());
         String tableName = getTableName(metricsData.getApp(), 
metricsData.getMetrics());
-//        createTable(tableName);
+        createTable(tableName);
         List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
         StringBuilder jsonData = new StringBuilder();
-        jsonData.append("[");
+        jsonData.append("{");
 
         for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
             StringBuilder row = new StringBuilder();
-            row.append("{");
             row.append("\"monitor_id\":\"").append(monitorId).append("\",");
             for (int i = 0; i < fieldsList.size(); i++) {
                 CollectRep.Field field = fieldsList.get(i);
@@ -187,18 +189,29 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
                     }
                 }
             }
-            row.append("\"ts\":").append(System.currentTimeMillis());
             row.append("},");
             jsonData.append(row);
         }
 
-        // Remove the trailing comma and close the JSON array
+        // Remove the trailing comma and close the JSON object
         if (jsonData.charAt(jsonData.length() - 1) == ',') {
-            jsonData.setCharAt(jsonData.length() - 1, ']');
+            jsonData.deleteCharAt(jsonData.length() - 1);
         } else {
-            jsonData.append(']');
+            jsonData.append('}');
         }
-        log.info("[warehouse doris]--Write data: {}", jsonData);
+
+        System.out.println(jsonData);
+
+        String ts = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+        String json =
+            """
+                {
+                  "ts": "%s",
+                  "v": %s
+                }
+            """.formatted(ts, jsonData);
+
+        log.info("[warehouse doris]--Write data: {}", json);
 
         try {
             DorisStreamLoadUtil.sendData(
@@ -208,7 +221,7 @@ public class DorisDataStorage extends 
AbstractHistoryDataStorage {
                     dorisProperties.password(),
                     dorisProperties.database(),
                     tableName,
-                    jsonData.toString()
+                    json
             );
             log.info("[warehouse doris]-Write successful");
         } catch (Exception e) {
diff --git 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
index 125855976..e21439ffa 100644
--- 
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
+++ 
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
@@ -67,14 +67,13 @@ public class DorisStreamLoadUtil {
             put.setHeader("format","json");
             // mode:array
             put.setHeader("strip_outer_array","true");
-            // ignore error
-            put.setHeader("max_filter_ratio","0.1");
             put.setEntity(entity);
 
             try (CloseableHttpResponse response = client.execute(put)) {
                 String loadResult = "";
                 if (response.getEntity() != null) {
                     loadResult = EntityUtils.toString(response.getEntity());
+                    log.info("doris stream load result: {}", loadResult);
                 }
                 final int statusCode = 
response.getStatusLine().getStatusCode();
                 if (statusCode != 200) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to