This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/doris by this push:
new 61df0b95c ```feat(warehouse): create table dynamically in
DorisDataStorage
61df0b95c is described below
commit 61df0b95c76d74014d796a7fd84633e100818212
Author: zqr10159 <[email protected]>
AuthorDate: Wed Aug 28 00:47:26 2024 +0800
```feat(warehouse): create table dynamically in DorisDataStorage
DorisDataStorage now creates the database and table dynamically based on
the incoming metrics data.
This change removes the need for manual database and table creation in the
Doris database.
The connection validation and table creation have been adapted to enhance
the auto-configuration
capabilities.
```
---
.../store/history/doris/DorisDataStorage.java | 134 ++++++++++++++++++---
1 file changed, 119 insertions(+), 15 deletions(-)
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 24250e18d..150926697 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -24,8 +24,11 @@ import java.math.RoundingMode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -53,10 +56,12 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
ts datetime(6),
monitor_id bigint(20),
metrics varchar(2000),
+ label varchar(2000),
v VARIANT,
INDEX idx_var(v) USING INVERTED PROPERTIES("parser" =
"unicode"),
INDEX idx_id(monitor_id) USING INVERTED,
- INDEX idx_metrics(metrics) USING INVERTED PROPERTIES("parser"
= "unicode")
+ INDEX idx_metrics(metrics) USING INVERTED PROPERTIES("parser"
= "unicode"),
+ INDEX idx_label(label) USING INVERTED PROPERTIES("parser" =
"unicode")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
@@ -77,9 +82,43 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
);
""";
- private static final String QUERY_HISTORY_SQL = "SELECT UNIX_TIMESTAMP(ts)
* 1000 as ts, v['%s'], metrics FROM %s WHERE ts >= now() - interval %s and
monitor_id = %s order by ts desc;";
+ private static final String QUERY_HISTORY_SQL = "SELECT UNIX_TIMESTAMP(ts)
* 1000 as ts, v['%s'], metrics, label FROM %s WHERE ts >= now() - interval %s
and monitor_id = %s order by ts desc;";
+
+ private static final String QUERY_HISTORY_WITH_LABEL_SQL = "SELECT
UNIX_TIMESTAMP(ts) * 1000 as ts, v['%s'], metrics, label FROM %s WHERE ts >=
now() - interval %s and monitor_id = %s and label = '%s' order by ts desc;";
+
+ private static final String QUERY_INSTANCE_SQL = "SELECT DISTINCT label
FROM %s WHERE ts >= now() - interval 1 WEEK and metrics = '%s';";
+
+ private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL = """
+ WITH base_data AS (
+ SELECT
+ UNIX_TIMESTAMP(ts) * 1000 asts,
+ CAST(v['%s'] AS DOUBLE) AS value
+ FROM
+ %s
+ WHERE
+ label = '%s'
+ AND ts >= DATE_SUB(NOW(), INTERVAL 4 HOUR)
+ AND v['%s'] IS NOT NULL
+ ),
+ first_value_cte AS (
+ SELECT
+ ts,
+ value,
+ FIRST_VALUE(value) OVER (ORDER BY ts ASC) AS first
+ FROM
+ base_data
+ )
+ SELECT
+ MIN(ts) AS first_ts,
+ MIN(first) AS first_value,
+ AVG(value) AS avg,
+ MIN(value) AS min,
+ MAX(value) AS max
+ FROM
+ first_value_cte;
+
+ """;
- private static final String QUERY_HISTORY_WITH_INSTANCE_SQL = "SELECT
UNIX_TIMESTAMP(ts) * 1000 as ts, v['%s'], metrics FROM %s WHERE ts >= now() -
interval %s and monitor_id = %s and metrics = '%s' order by ts desc;";
private HikariDataSource hikariDataSource;
@@ -90,9 +129,11 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
if (!dorisProperties.enabled()) {
return;
}
+ ZoneId systemZoneId = ZoneId.systemDefault();
+ String zoneIdString = systemZoneId.getId();
this.hikariDataSource = new HikariDataSource();
this.hikariDataSource.setJdbcUrl("jdbc:mysql://" +
dorisProperties.host() + ":" + dorisProperties.jdbcPort()
- +
"/mysql?useUnicode=true&characterEncoding=utf8&useTimezone=true&useSSL=false&allowPublicKeyRetrieval=true");
+ +
"/mysql?useUnicode=true&characterEncoding=utf8&useTimezone=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone="+zoneIdString);
this.hikariDataSource.setUsername(dorisProperties.username());
this.hikariDataSource.setPassword(dorisProperties.password());
this.hikariDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
@@ -156,7 +197,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
String interval = history2interval(history);
String selectSql = label == null ? String.format(QUERY_HISTORY_SQL,
metric, getTableName(app), interval, monitorId)
- : String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric,
getTableName(app), interval, monitorId, label);
+ : String.format(QUERY_HISTORY_WITH_LABEL_SQL, metric,
getTableName(app), interval, monitorId, label);
try (Connection connection = hikariDataSource.getConnection()) {
connection.createStatement().execute("USE " +
dorisProperties.database());
ResultSet resultSet =
connection.createStatement().executeQuery(selectSql);
@@ -166,7 +207,7 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
double value = resultSet.getDouble(2);
String strValue = double2decimalString(value);
- String instanceValue = resultSet.getString(3);
+ String instanceValue = resultSet.getString(4);
if (instanceValue == null ||
StringUtils.isBlank(instanceValue)) {
instanceValue = "";
}
@@ -195,8 +236,64 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
*/
@Override
public Map<String, List<Value>> getHistoryIntervalMetricData(Long
monitorId, String app, String metrics, String metric, String label, String
history) {
- return Map.of();
- }
+ if (!isServerAvailable()) {
+ return Collections.emptyMap();
+ }
+ List<String> instances = new LinkedList<>();
+ if (label != null && !StringUtils.isBlank(label)) {
+ instances.add(label);
+ }
+ String table = getTableName(app);
+
+ if (instances.isEmpty()) {
+ String selectSql = String.format(QUERY_INSTANCE_SQL, table,
metrics);
+ try (Connection connection = hikariDataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(selectSql)) {
+ while (resultSet.next()) {
+ String instanceValue = resultSet.getString(1);
+ if (instanceValue == null ||
StringUtils.isBlank(instanceValue)) {
+ instances.add("''");
+ } else {
+ instances.add(instanceValue);
+ }
+ }
+ } catch (Exception e) {
+ log.error("[warehouse doris] failed to query instances{}",
e.getMessage(), e);
+ }
+ }
+ Map<String, List<Value>> instanceValuesMap = new
HashMap<>(instances.size());
+ for (String instanceValue : instances) {
+ String selectSql =
String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, metric, table, label,
metric,
+ table, instanceValue, history2interval(history));
+
+ if (log.isDebugEnabled()) {
+ log.debug("[warehouse greptime] getHistoryIntervalMetricData
sql: {}", selectSql);
+ }
+
+ List<Value> values =
instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
+ try (Connection connection = hikariDataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(selectSql)) {
+ while (resultSet.next()) {
+ long ts = resultSet.getLong(1);
+ double origin = resultSet.getDouble(2);
+ String originStr = double2decimalString(origin);
+ double avg = resultSet.getDouble(3);
+ String avgStr = double2decimalString(avg);
+ double min = resultSet.getDouble(4);
+ String minStr = double2decimalString(min);
+ double max = resultSet.getDouble(5);
+ String maxStr = double2decimalString(max);
+ Value value =
Value.builder().origin(originStr).mean(avgStr).min(minStr).max(maxStr).time(ts)
+ .build();
+ values.add(value);
+ }
+ } catch (Exception e) {
+ log.error("[warehouse greptime] failed to
getHistoryIntervalMetricData: {}", e.getMessage(), e);
+ }
+ }
+ return instanceValuesMap;}
/**
* save metrics data
@@ -219,24 +316,31 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
Map<String, Object> dataMap = new HashMap<>();
+ Map<String, String> labels = new HashMap<>(8);
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
for (int i = 0; i < fieldsList.size(); i++) {
CollectRep.Field field = fieldsList.get(i);
- String value = valueRow.getColumns(i);
- if (!CommonConstants.NULL_VALUE.equals(value)) {
- Object fieldValue = field.getType() ==
CommonConstants.TYPE_NUMBER
- ? Double.parseDouble(value)
- : value;
- dataMap.put(field.getName(), fieldValue);
+ if
(!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) {
+ if (field.getType() == CommonConstants.TYPE_NUMBER) {
+ dataMap.put(field.getName(),
Double.parseDouble(valueRow.getColumns(i)));
+ } else if (field.getType() == CommonConstants.TYPE_STRING)
{
+ dataMap.put(field.getName(), valueRow.getColumns(i));
+ }
+ if (field.getLabel()) {
+ labels.put(field.getName(), valueRow.getColumns(i));
+ }
+ } else {
+ dataMap.put(field.getName(), null);
}
}
- break;
+
}
Map<String, Object> outerMap = new HashMap<>();
outerMap.put("ts",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS")));
outerMap.put("monitor_id", monitorId);
outerMap.put("metrics", metrics);
+ outerMap.put("label", labels);
outerMap.put("v", dataMap);
String json = JsonUtil.toJson(outerMap);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]