This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/doris by this push:
new 558709ab4 ```fix(warehouse): resolve issues with Doris data storage
558709ab4 is described below
commit 558709ab4ef44bd735767aeba73417d999f92ac7
Author: zqr10159 <[email protected]>
AuthorDate: Mon Aug 26 17:43:26 2024 +0800
```fix(warehouse): resolve issues with Doris data storage
Correct the SQL query string for creating databases and tables in
DorisDataStorage.
Ensure compatibility with Doris by adjusting the JDBC URL and enabling
SSL-related
properties. Refactor history metric data retrieval and data writing to
Doris, including
proper interval conversion handling and JSON data structure adherence.
Enhance logging
by filtering out zero timestamps and handling decimal precision for metric
values.
```
---
.../store/history/doris/DorisDataStorage.java | 129 ++++++++++++++-------
.../store/history/doris/DorisProperties.java | 2 +-
.../warehouse/utils/DorisStreamLoadUtil.java | 4 +-
3 files changed, 91 insertions(+), 44 deletions(-)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 9566bdf79..8a4baddd2 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -19,17 +19,23 @@
package org.apache.hertzbeat.warehouse.store.history.doris;
import com.zaxxer.hikari.HikariDataSource;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.time.DateUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
import org.apache.hertzbeat.warehouse.utils.DorisStreamLoadUtil;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,11 +46,17 @@ import org.springframework.stereotype.Component;
@ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled",
havingValue = "true")
@Slf4j
public class DorisDataStorage extends AbstractHistoryDataStorage {
+ private final static String CREATE_DATABASE_SQL = "CREATE DATABASE %s;";
+
private final static String CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS %s (
- ts datetime,
+ ts datetime(6),
+ monitor_id bigint(20),
+ metrics varchar(2000),
v VARIANT,
- INDEX idx_var(v) USING INVERTED PROPERTIES("parser" =
"unicode")
+ INDEX idx_var(v) USING INVERTED PROPERTIES("parser" =
"unicode"),
+ INDEX idx_id(monitor_id) USING INVERTED,
+ INDEX idx_metrics(metrics) USING INVERTED PROPERTIES("parser"
= "unicode")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
@@ -65,6 +77,8 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
);
""";
+ private static final String QUERY_HISTORY_SQL = "SELECT UNIX_TIMESTAMP(ts)
as ts, v['%s'], metrics FROM %s WHERE ts >= now() - interval '%s' and
monitor_id = %s `ORDER BY` ts `DESC`;";
+
private HikariDataSource hikariDataSource;
@Autowired
@@ -75,7 +89,8 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
return;
}
this.hikariDataSource = new HikariDataSource();
- this.hikariDataSource.setJdbcUrl("jdbc:mysql://" +
dorisProperties.host() + ":" + dorisProperties.jdbcPort() + "/" + "mysql");
+ this.hikariDataSource.setJdbcUrl("jdbc:mysql://" +
dorisProperties.host() + ":" + dorisProperties.jdbcPort()
+ +
"/mysql?useUnicode=true&characterEncoding=utf8&useTimezone=true&useSSL=false&allowPublicKeyRetrieval=true");
this.hikariDataSource.setUsername(dorisProperties.username());
this.hikariDataSource.setPassword(dorisProperties.password());
this.hikariDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
@@ -99,7 +114,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
private void createDatabase(String database) {
try (Connection connection = hikariDataSource.getConnection()) {
- connection.createStatement().execute("CREATE DATABASE IF NOT
EXISTS " + database);
+
connection.createStatement().executeUpdate(String.format(CREATE_DATABASE_SQL,
database));
log.info("[warehouse doris]--Create database {} successful",
database);
} catch (SQLException e) {
log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
@@ -110,14 +125,14 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
try (Connection connection = hikariDataSource.getConnection()) {
connection.createStatement().execute("USE " +
dorisProperties.database());
String createTableSql = String.format(CREATE_TABLE_SQL, tableName,
dorisProperties.expireTime());
- connection.createStatement().execute(createTableSql);
+ connection.createStatement().executeUpdate(createTableSql);
} catch (SQLException e) {
log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
}
}
- private String getTableName(String app, String metrics) {
- return "hzb_" + app + "_" + metrics;
+ private String getTableName(String app) {
+ return "hzb_" + app;
}
@@ -134,9 +149,59 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
*/
@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId,
String app, String metrics, String metric, String label, String history) {
+ Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+ if (!isServerAvailable()) {
+ return instanceValuesMap;
+ }
+
+ String interval = history2interval(history);
+ String selectSql = String.format(QUERY_HISTORY_SQL, getTableName(app),
metric, interval, monitorId);
+ try (Connection connection = hikariDataSource.getConnection()) {
+ connection.createStatement().execute("USE " +
dorisProperties.database());
+ ResultSet resultSet =
connection.createStatement().executeQuery(selectSql);
+ while (resultSet.next()) {
+ long ts = resultSet.getLong(1);
+ if (ts == 0) {
+ if (log.isErrorEnabled()) {
+ log.error("[warehouse doris] getHistoryMetricData
query result timestamp is 0, ignore. {}.",
+ selectSql);
+ }
+ continue;
+ }
+ String instanceValue = resultSet.getString(2);
+ if (instanceValue == null ||
StringUtils.isBlank(instanceValue)) {
+ instanceValue = "";
+ }
+ double value = resultSet.getDouble(3);
+ String strValue = double2decimalString(value);
+
+ List<Value> valueList =
instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
+ valueList.add(new Value(strValue, ts));
+ }
+ log.info("instanceValuesMap:{}", instanceValuesMap);
+ return instanceValuesMap;
+
+ } catch (SQLException e) {
+ log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+ }
return Map.of();
}
+ private String double2decimalString(double d) {
+ return BigDecimal.valueOf(d).setScale(4,
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ }
+ private String history2interval(String history) {
+ if (history == null) {
+ return null;
+ }
+ history = history.trim().toLowerCase();
+ // Be careful, the order matters.
+ return history.replaceAll("d", " day") //
+ .replaceAll("s", " second") //
+ .replaceAll("w", " week") //
+ .replaceAll("h", " hour")//
+ .replaceAll("m", " minute");
+ }
/**
* query history range interval metrics data from doris
* max min mean metrics value
@@ -169,50 +234,32 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
return;
}
String monitorId = String.valueOf(metricsData.getId());
- String tableName = getTableName(metricsData.getApp(),
metricsData.getMetrics());
+ String tableName = getTableName(metricsData.getApp());
+ String metrics = metricsData.getMetrics();
createTable(tableName);
List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
- StringBuilder jsonData = new StringBuilder();
- jsonData.append("{");
+ Map<String, Object> dataMap = new HashMap<>();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
- StringBuilder row = new StringBuilder();
- row.append("\"monitor_id\":\"").append(monitorId).append("\",");
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
String value = valueRow.getColumns(i);
if (!CommonConstants.NULL_VALUE.equals(value)) {
- if (field.getType() == CommonConstants.TYPE_NUMBER) {
-
row.append("\"").append(field.getName()).append("\":").append(Double.parseDouble(value)).append(",");
- } else if (field.getType() == CommonConstants.TYPE_STRING)
{
-
row.append("\"").append(field.getName()).append("\":\"").append(value).append("\",");
- }
+ Object fieldValue = field.getType() ==
CommonConstants.TYPE_NUMBER
+ ? Double.parseDouble(value)
+ : value;
+ dataMap.put(field.getName(), fieldValue);
}
}
- row.append("},");
- jsonData.append(row);
+ break;
}
- // Remove the trailing comma and close the JSON object
- if (jsonData.charAt(jsonData.length() - 1) == ',') {
- jsonData.deleteCharAt(jsonData.length() - 1);
- } else {
- jsonData.append('}');
- }
-
- System.out.println(jsonData);
-
- String ts =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- String json =
- """
- {
- "ts": "%s",
- "v": %s
- }
- """.formatted(ts, jsonData);
-
- log.info("[warehouse doris]--Write data: {}", json);
-
+ Map<String, Object> outerMap = new HashMap<>();
+ outerMap.put("ts",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS")));
+ outerMap.put("monitor_id", monitorId);
+ outerMap.put("metrics", metrics);
+ outerMap.put("v", dataMap);
+ String json = JsonUtil.toJson(outerMap);
try {
DorisStreamLoadUtil.sendData(
dorisProperties.host(),
@@ -223,7 +270,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
tableName,
json
);
- log.info("[warehouse doris]-Write successful");
+ log.info("[warehouse doris]-Write successful: {}", json);
} catch (Exception e) {
log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
index bd9a95356..41a5ad6b0 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
@@ -38,6 +38,6 @@ public record DorisProperties(@DefaultValue("false") boolean
enabled,
@DefaultValue("hertzbeat") String database,
@DefaultValue("hzb_history") String table,
// Database TTL, default is 30 days.
- @DefaultValue("30d") String expireTime) {
+ @DefaultValue("30") String expireTime) {
}
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
index e21439ffa..f1a930c8e 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
@@ -66,14 +66,14 @@ public class DorisStreamLoadUtil {
// format:json
put.setHeader("format","json");
// mode:array
- put.setHeader("strip_outer_array","true");
+ put.setHeader("strip_outer_array","false");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
- log.info("doris stream load result: {}", loadResult);
+ log.debug("doris stream load result: {}", loadResult);
}
final int statusCode =
response.getStatusLine().getStatusCode();
if (statusCode != 200) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]