This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/doris by this push:
new 68281b01c ```feat(doris): implement dynamic partitioning and
standardized table naming
68281b01c is described below
commit 68281b01c62bc7e9685e99a0b3032fc1e30e6b56
Author: zqr10159 <[email protected]>
AuthorDate: Mon Aug 26 11:03:55 2024 +0800
```feat(doris): implement dynamic partitioning and standardized table naming
- Enable dynamic partitioning in DorisDataStorage with a configurable start
parameter.
- Standardize table naming convention by prefixing table names with
'hzb_'.- Refactor JSON data formatting to include timestamp in Doris data
ingestion.
- Log Doris stream load result for better monitoring and troubleshooting.
```
---
.../store/history/doris/DorisDataStorage.java | 39 ++++++++++++++--------
.../warehouse/utils/DorisStreamLoadUtil.java | 3 +-
2 files changed, 27 insertions(+), 15 deletions(-)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 27637ba25..9566bdf79 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -21,10 +21,12 @@ package org.apache.hertzbeat.warehouse.store.history.doris;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.HashMap;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
@@ -53,7 +55,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
- "dynamic_partition.start" = "-30",
+ "dynamic_partition.start" = "-%s",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "16",
@@ -107,14 +109,15 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
private void createTable(String tableName) {
try (Connection connection = hikariDataSource.getConnection()) {
connection.createStatement().execute("USE " +
dorisProperties.database());
- connection.createStatement().execute("CREATE TABLE IF NOT EXISTS "
+ tableName);
+ String createTableSql = String.format(CREATE_TABLE_SQL, tableName,
dorisProperties.expireTime());
+ connection.createStatement().execute(createTableSql);
} catch (SQLException e) {
log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
}
}
private String getTableName(String app, String metrics) {
- return app + "_" + metrics;
+ return "hzb_" + app + "_" + metrics;
}
@@ -167,14 +170,13 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
}
String monitorId = String.valueOf(metricsData.getId());
String tableName = getTableName(metricsData.getApp(),
metricsData.getMetrics());
-// createTable(tableName);
+ createTable(tableName);
List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
StringBuilder jsonData = new StringBuilder();
- jsonData.append("[");
+ jsonData.append("{");
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
StringBuilder row = new StringBuilder();
- row.append("{");
row.append("\"monitor_id\":\"").append(monitorId).append("\",");
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
@@ -187,18 +189,29 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
}
}
}
- row.append("\"ts\":").append(System.currentTimeMillis());
row.append("},");
jsonData.append(row);
}
- // Remove the trailing comma and close the JSON array
+ // Remove the trailing comma and close the JSON object
if (jsonData.charAt(jsonData.length() - 1) == ',') {
- jsonData.setCharAt(jsonData.length() - 1, ']');
+ jsonData.deleteCharAt(jsonData.length() - 1);
} else {
- jsonData.append(']');
+ jsonData.append('}');
}
- log.info("[warehouse doris]--Write data: {}", jsonData);
+
+ System.out.println(jsonData);
+
+ String ts =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ String json =
+ """
+ {
+ "ts": "%s",
+ "v": %s
+ }
+ """.formatted(ts, jsonData);
+
+ log.info("[warehouse doris]--Write data: {}", json);
try {
DorisStreamLoadUtil.sendData(
@@ -208,7 +221,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
dorisProperties.password(),
dorisProperties.database(),
tableName,
- jsonData.toString()
+ json
);
log.info("[warehouse doris]-Write successful");
} catch (Exception e) {
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
index 125855976..e21439ffa 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
@@ -67,14 +67,13 @@ public class DorisStreamLoadUtil {
put.setHeader("format","json");
// mode:array
put.setHeader("strip_outer_array","true");
- // ignore error
- put.setHeader("max_filter_ratio","0.1");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
+ log.info("doris stream load result: {}", loadResult);
}
final int statusCode =
response.getStatusLine().getStatusCode();
if (statusCode != 200) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]