This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch doris
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/doris by this push:
new b11cea90c ```fix(warehouse): update Doris database name to 'hertzbeat'
b11cea90c is described below
commit b11cea90c79ba9b75e114bbb59372f7a5bda452e
Author: zqr10159 <[email protected]>
AuthorDate: Sun Aug 25 20:10:21 2024 +0800
```fix(warehouse): update Doris database name to 'hertzbeat'
Change the default database name in DorisDataStorage and DorisProperties
from
_internal_schema to hertzbeat to ensure metrics data is stored in the
correct database.
This aligns with the intended configuration for history data storage in
Doris.
```
---
manager/src/main/resources/application.yml | 2 +-
.../store/history/doris/DorisDataStorage.java | 115 +++++++++++++++++++++
.../store/history/doris/DorisProperties.java | 3 +-
.../warehouse/utils/DorisStreamLoadUtil.java | 6 +-
4 files changed, 122 insertions(+), 4 deletions(-)
diff --git a/manager/src/main/resources/application.yml
b/manager/src/main/resources/application.yml
index 1902ae6ec..10f389150 100644
--- a/manager/src/main/resources/application.yml
+++ b/manager/src/main/resources/application.yml
@@ -171,7 +171,7 @@ warehouse:
httpPort: 8030
username: root
password:
- db: _internal_schema
+ database: hertzbeat
table: hzb_history
expire-time: '30d'
# store real-time metrics data, enable only one below
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
index 3a92892de..c732247c3 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisDataStorage.java
@@ -18,12 +18,19 @@
package org.apache.hertzbeat.warehouse.store.history.doris;
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
+import org.apache.hertzbeat.warehouse.utils.DorisStreamLoadUtil;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@@ -31,6 +38,59 @@ import org.springframework.stereotype.Component;
@ConditionalOnProperty(prefix = "warehouse.store.doris", name = "enabled",
havingValue = "true")
@Slf4j
public class DorisDataStorage extends AbstractHistoryDataStorage {
+
+ private HikariDataSource hikariDataSource;
+
+ @Autowired
+ private DorisProperties dorisProperties;
+
+ public DorisDataStorage(DorisProperties dorisProperties) {
+ if (!dorisProperties.enabled()) {
+ return;
+ }
+ this.hikariDataSource = new HikariDataSource();
+ this.hikariDataSource.setJdbcUrl("jdbc:mysql://" +
dorisProperties.host() + ":" + dorisProperties.jdbcPort() + "/" +
dorisProperties.database());
+ this.hikariDataSource.setUsername(dorisProperties.username());
+ this.hikariDataSource.setPassword(dorisProperties.password());
+ this.hikariDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
+ // minimum number of idle connection
+ this.hikariDataSource.setMinimumIdle(10);
+ // maximum number of connection in the pool
+ this.hikariDataSource.setMaximumPoolSize(10);
+ // maximum wait milliseconds for get connection from pool
+ this.hikariDataSource.setConnectionTimeout(30000);
+ // maximum lifetime for each connection
+ this.hikariDataSource.setMaxLifetime(0);
+ // max idle time for recycle idle connection
+ this.hikariDataSource.setIdleTimeout(0);
+ // validation query
+ this.hikariDataSource.setConnectionTestQuery("select 1");
+ createDatabase(dorisProperties.database());
+
+ }
+
+ private void createDatabase(String database) {
+ try (Connection connection = hikariDataSource.getConnection()) {
+ connection.createStatement().execute("CREATE DATABASE IF NOT
EXISTS " + database);
+ } catch (SQLException e) {
+ log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+ }
+ }
+
+ private void createTable(String database, String tableName) {
+ try (Connection connection = hikariDataSource.getConnection()) {
+ connection.createStatement().execute("USE " + database);
+ connection.createStatement().execute("CREATE TABLE IF NOT EXISTS "
+ tableName);
+ } catch (SQLException e) {
+ log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+ }
+ }
+
+ private String getTableName(String app, String metrics) {
+ return app + "_" + metrics;
+ }
+
+
/**
* query history range metrics data from doris
*
@@ -71,9 +131,64 @@ public class DorisDataStorage extends
AbstractHistoryDataStorage {
*/
@Override
public void saveData(CollectRep.MetricsData metricsData) {
+ if (!isServerAvailable() || metricsData.getCode() !=
CollectRep.Code.SUCCESS) {
+ return;
+ }
+ if (metricsData.getValuesList().isEmpty()) {
+ log.info("[warehouse doris] flush metrics data {} is null,
ignore.", metricsData.getId());
+ return;
+ }
+ String monitorId = String.valueOf(metricsData.getId());
+ String tableName = getTableName(metricsData.getApp(),
metricsData.getMetrics());
+ createTable(dorisProperties.database(), tableName);
+ List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
+ StringBuilder jsonData = new StringBuilder();
+ jsonData.append("[");
+
+ for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
+ StringBuilder row = new StringBuilder();
+ row.append("{");
+ row.append("\"monitor_id\":\"").append(monitorId).append("\",");
+ for (int i = 0; i < fieldsList.size(); i++) {
+ CollectRep.Field field = fieldsList.get(i);
+ String value = valueRow.getColumns(i);
+ if (!CommonConstants.NULL_VALUE.equals(value)) {
+ if (field.getType() == CommonConstants.TYPE_NUMBER) {
+
row.append("\"").append(field.getName()).append("\":").append(Double.parseDouble(value)).append(",");
+ } else if (field.getType() == CommonConstants.TYPE_STRING)
{
+
row.append("\"").append(field.getName()).append("\":\"").append(value).append("\",");
+ }
+ }
+ }
+ row.append("\"ts\":").append(System.currentTimeMillis());
+ row.append("},");
+ jsonData.append(row);
+ }
+ // Remove the trailing comma and close the JSON array
+ if (jsonData.charAt(jsonData.length() - 1) == ',') {
+ jsonData.setCharAt(jsonData.length() - 1, ']');
+ } else {
+ jsonData.append(']');
+ }
+
+ try {
+ DorisStreamLoadUtil.sendData(
+ dorisProperties.host(),
+ dorisProperties.httpPort(),
+ dorisProperties.username(),
+ dorisProperties.password(),
+ dorisProperties.database(),
+ tableName,
+ jsonData.toString()
+ );
+ log.debug("[warehouse doris]-Write successful");
+ } catch (Exception e) {
+ log.error("[warehouse doris]--Error: {}", e.getMessage(), e);
+ }
}
+
@Override
public void destroy() throws Exception {
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
index b3118ceb6..bd9a95356 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/doris/DorisProperties.java
@@ -32,9 +32,10 @@ import
org.springframework.boot.context.properties.bind.DefaultValue;
public record DorisProperties(@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("8030") int httpPort,
+ @DefaultValue("9030") int jdbcPort,
@DefaultValue("root") String username,
@DefaultValue("") String password,
- @DefaultValue("_internal_schema") String db,
+ @DefaultValue("hertzbeat") String database,
@DefaultValue("hzb_history") String table,
// Database TTL, default is 30 days.
@DefaultValue("30d") String expireTime) {
diff --git
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
index 464569c28..125855976 100644
---
a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
+++
b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/utils/DorisStreamLoadUtil.java
@@ -19,6 +19,7 @@ package org.apache.hertzbeat.warehouse.utils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
@@ -32,19 +33,20 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
@Slf4j
+@Data
public class DorisStreamLoadUtil {
public static void sendData(String host,
int httpPort,
String username,
String password,
- String db,
+ String database,
String table,
String content) throws Exception {
final String loadUrl =
String.format("http://%s:%s/api/%s/%s/_stream_load",
host,
httpPort,
- db,
+ database,
table);
final HttpClientBuilder httpClientBuilder = HttpClients
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]