This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch duckdb
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/duckdb by this push:
     new 44807fbf5 feat(warehouse): implement DuckDB data storage
44807fbf5 is described below

commit 44807fbf54fbfe3df28452eaea3a01587c51ad17
Author: Logic <[email protected]>
AuthorDate: Tue Dec 17 23:33:33 2024 +0800

    feat(warehouse): implement DuckDB data storage
    
    - Add DuckDBDataStorage class to handle data storage operations
    - Implement saveData and getHistoryMetricData methods
    - Update application.yml to enable DuckDB storage
    - Add DuckDB JDBC dependency to pom.xml
---
 .../src/main/resources/application.yml             |   6 +-
 hertzbeat-warehouse/pom.xml                        |   6 +
 .../store/history/duckdb/DuckDBDataStorage.java    | 285 ++++++++++++++++-----
 3 files changed, 227 insertions(+), 70 deletions(-)

diff --git a/hertzbeat-manager/src/main/resources/application.yml 
b/hertzbeat-manager/src/main/resources/application.yml
index 544abc6cf..12b4af16d 100644
--- a/hertzbeat-manager/src/main/resources/application.yml
+++ b/hertzbeat-manager/src/main/resources/application.yml
@@ -77,7 +77,7 @@ spring:
       max-lifetime: 120000
 
   jpa:
-    show-sql: false
+    show-sql: true
     database-platform: org.eclipse.persistence.platform.database.MySQLPlatform
     database: h2
     properties:
@@ -125,14 +125,14 @@ warehouse:
   store:
     # store history metrics data, enable only one below
     jpa:
-      enabled: true
+      enabled: false
       # The maximum retention time for history records, after which records 
will be deleted
       expire-time: 1h
       # The maximum number of history records retained, if this number is 
exceeded, half of the data in this configuration item will be deleted
       # (please set this configuration reasonably as history records can 
affect performance when it is large)
       max-history-record-num: 6000
     duckdb:
-      enabled: false
+      enabled: true
       driver-class-name: org.duckdb.DuckDBDriver
       url: jdbc:duckdb:./data/hertzbeat.duckdb
     victoria-metrics:
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index caa9d0c56..88df2f0e2 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -120,5 +120,11 @@
             <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.duckdb</groupId>
+            <artifactId>duckdb_jdbc</artifactId>
+            <version>1.1.3</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/duckdb/DuckDBDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/duckdb/DuckDBDataStorage.java
index 86a60278c..0870d9f7a 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/duckdb/DuckDBDataStorage.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/duckdb/DuckDBDataStorage.java
@@ -1,16 +1,31 @@
 package org.apache.hertzbeat.warehouse.store.history.duckdb;
 
+import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZonedDateTime;
+import java.time.temporal.TemporalAmount;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.Value;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.util.TimePeriodUtil;
 import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage;
+import org.duckdb.DuckDBConnection;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Primary;
 import org.springframework.stereotype.Component;
@@ -25,95 +40,231 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class DuckDBDataStorage extends AbstractHistoryDataStorage {
 
-    private static final String CONSTANTS_URL_PREFIX = "jdbc:TAOS-RS://";
     private static final Pattern SQL_SPECIAL_STRING_PATTERN = 
Pattern.compile("(\\\\)|(')");
-    private static final String INSTANCE_NULL = "''";
-    private static final String CONSTANTS_CREATE_DATABASE = "CREATE DATABASE 
IF NOT EXISTS %s";
-    private static final String INSERT_TABLE_DATA_SQL = "INSERT INTO `%s` 
USING `%s` TAGS (%s) VALUES %s";
-    private static final String CREATE_SUPER_TABLE_SQL = "CREATE STABLE IF NOT 
EXISTS `%s` %s TAGS (monitor BIGINT)";
-    private static final String NO_SUPER_TABLE_ERROR = "Table does not exist";
+    private static final String CONSTANTS_CREATE_TABLE = "CREATE TABLE IF NOT 
EXISTS hzb_history (monitorId LONG, app STRING, metrics STRING, time LONG, 
metric STRING, metricType INT, dou DOUBLE, str STRING, int32 INT, instance 
STRING)";
+    private static final String INSERT_TABLE_DATA_SQL = "INSERT INTO 
hzb_history (monitorId, app, metrics, time, metric, metricType, dou, str, 
int32, instance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+    private static final String QUERY_HISTORY_WITH_INSTANCE_SQL = "SELECT 
time, instance, %s FROM hzb_history WHERE instance = '%s' AND time >= ? ORDER 
BY time DESC";
+    private static final String QUERY_HISTORY_SQL = "SELECT time, instance, %s 
FROM hzb_history WHERE time >= ? ORDER BY time DESC";
 
-    private static final String QUERY_HISTORY_WITH_INSTANCE_SQL =
-            "SELECT ts, instance, `%s` FROM `%s` WHERE instance = '%s' AND ts 
>= now - %s order by ts desc";
-    private static final String QUERY_HISTORY_SQL =
-            "SELECT ts, instance, `%s` FROM `%s` WHERE ts >= now - %s order by 
ts desc";
-    private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL =
-            "SELECT first(ts), first(`%s`), avg(`%s`), min(`%s`), max(`%s`) 
FROM `%s` WHERE instance = '%s' AND ts >= now - %s interval(4h)";
-    private static final String QUERY_INSTANCE_SQL =
-            "SELECT DISTINCT instance FROM `%s` WHERE ts >= now - 1w";
-
-    private static final String TABLE_NOT_EXIST = "Table does not exist";
-
-    private static final String HERTZBEAT = "hertzbeat";
+    private final String duckDBUrl;
 
     public DuckDBDataStorage(DuckDBProperties duckDBProperties) {
         if (duckDBProperties == null) {
             log.error("init error, please config Warehouse DuckDB props in 
application.yml");
             throw new IllegalArgumentException("please config Warehouse DuckDB 
props");
         }
+        this.serverAvailable = true;
+        this.duckDBUrl = duckDBProperties.url();
+        initDuckDBDatabase();
     }
 
-    private HikariDataSource hikariDataSource;
-
-    /**
-     * init duckdb data storage
-     *
-     * @param duckDBProperties duckdb properties
-     */
-    private void initDuckDBDatabase(final DuckDBProperties duckDBProperties) 
throws SQLException {
-        try (
-                final Connection tempConnection = 
DriverManager.getConnection(duckDBProperties.url())
-        ) {
-            
tempConnection.prepareStatement(String.format(CONSTANTS_CREATE_DATABASE, 
HERTZBEAT))
-                    .execute();
+    private void initDuckDBDatabase() {
+        try {
+            DuckDBConnection conn = (DuckDBConnection) 
DriverManager.getConnection(duckDBUrl);
+            conn.prepareStatement(CONSTANTS_CREATE_TABLE).execute();
+        } catch (SQLException e) {
+            log.error("DuckDB create table error", e);
+            throw new RuntimeException("DuckDB create table error");
         }
     }
 
-    /**
-     * query history range metrics data from time-series db
-     *
-     * @param monitorId monitor id
-     * @param app       monitor type
-     * @param metrics   metrics
-     * @param metric    metric
-     * @param label     label
-     * @param history   range
-     * @return metrics data
-     */
     @Override
-    public Map<String, List<Value>> getHistoryMetricData(Long monitorId, 
String app, String metrics, String metric, String label, String history) {
-        return Map.of();
+    public void saveData(CollectRep.MetricsData metricsData) {
+        if (!isServerAvailable() || metricsData.getCode() != 
CollectRep.Code.SUCCESS) {
+            return;
+        }
+        if (metricsData.getValuesList().isEmpty()) {
+            log.info("[warehouse duckdb] flush metrics data {} is null, 
ignore.", metricsData.getId());
+            return;
+        }        
+        List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
+
+        try (
+                DuckDBConnection conn = (DuckDBConnection) 
DriverManager.getConnection(duckDBUrl);
+                var appender = 
conn.createAppender(DuckDBConnection.DEFAULT_SCHEMA, "hzb_history")) {
+            for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
+                for (int i = 0; i < fieldsList.size(); i++) {
+                    CollectRep.Field field = fieldsList.get(i);
+                    String columnValue = valueRow.getColumns(i);
+                    appender.beginRow();
+                    // TODO: id
+                    appender.append(metricsData.getId());
+                    appender.append(metricsData.getApp());
+                    appender.append(metricsData.getMetrics());
+                    // metric
+                    appender.append(field.getName());
+                    // instance
+                    Map<String, String> labels = new HashMap<>(8);
+
+                    if (field.getLabel()) {
+                        labels.put(field.getName(), columnValue);
+                    }
+                    appender.append(JsonUtil.toJson(labels));
+                    appender.append(field.getType());
+                    if (CommonConstants.NULL_VALUE.equals(columnValue)) {
+                        switch (field.getType()) {
+                            case CommonConstants.TYPE_NUMBER:
+                                
appender.append(Double.parseDouble(columnValue));
+                                appender.append(null);
+                                appender.append(null);
+                                break;
+                            case CommonConstants.TYPE_STRING:
+                                appender.append(null);
+                                
appender.append(formatStringValue(columnValue));
+                                appender.append(null);
+                                break;
+                            case CommonConstants.TYPE_TIME:
+                                appender.append(null);
+                                appender.append(null);
+                                appender.append(Integer.parseInt(columnValue));
+                                break;
+                            default:
+                                
appender.append(Double.parseDouble(columnValue));
+                                appender.append(null);
+                                appender.append(null);
+                                break;
+                        }
+                    }
+                    appender.append(metricsData.getTime());
+
+                    appender.append(JsonUtil.toJson(new HashMap<>()));
+                    appender.endRow();
+                }
+            }
+        } catch (SQLException e) {
+            log.error("Error saving data to DuckDB: ", e);
+        }
     }
 
-    /**
-     * query history range interval metrics data from time-series db
-     * max min mean metrics value
-     *
-     * @param monitorId monitor id
-     * @param app       monitor type
-     * @param metrics   metrics
-     * @param metric    metric
-     * @param label     label
-     * @param history   history range
-     * @return metrics data
-     */
-    @Override
-    public Map<String, List<Value>> getHistoryIntervalMetricData(Long 
monitorId, String app, String metrics, String metric, String label, String 
history) {
-        return Map.of();
+    public Map<String, List<Value>> getHistoryMetricData(Long monitorId, 
String app, String metrics,
+                                                         String metric, String 
label, String history) {
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+        Connection connection;
+        try {
+            connection = DriverManager.getConnection(duckDBUrl);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        ;
+        PreparedStatement pstmt = null;
+        ResultSet rs = null;
+
+        try {
+
+            // Build the base SQL query with correct column names
+            StringBuilder sql = new StringBuilder(
+                    "SELECT * FROM hzb_history WHERE monitorId = ? AND app = ? 
AND metrics = ? AND metric = ?");
+
+            // Add optional conditions
+            if (StringUtils.isNotBlank(label)) {
+                sql.append(" AND instance = ?");
+            }
+
+            // Add time condition if history parameter is provided
+            if (history != null) {
+                try {
+                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                    ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
+                    long timeBefore = dateTime.toEpochSecond() * 1000L;
+                    sql.append(" AND time >= ?");
+                } catch (Exception e) {
+                    log.error("Error parsing history time: {}", 
e.getMessage());
+                }
+            }
+
+            // Add ordering
+            sql.append(" ORDER BY time DESC");
+
+            // Prepare statement and set parameters
+            pstmt = connection.prepareStatement(sql.toString());
+            int paramIndex = 1;
+            pstmt.setLong(paramIndex++, monitorId);
+            pstmt.setString(paramIndex++, app);
+            pstmt.setString(paramIndex++, metrics);
+            pstmt.setString(paramIndex++, metric);
+
+            if (StringUtils.isNotBlank(label)) {
+                pstmt.setString(paramIndex++, label);
+            }
+
+            if (history != null) {
+                try {
+                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                    ZonedDateTime dateTime = 
ZonedDateTime.now().minus(temporalAmount);
+                    long timeBefore = dateTime.toEpochSecond() * 1000L;
+                    pstmt.setLong(paramIndex, timeBefore);
+                } catch (Exception e) {
+                    log.error("Error setting time parameter: {}", 
e.getMessage());
+                }
+            }
+
+            // Execute query and process results
+            rs = pstmt.executeQuery();
+            while (rs.next()) {
+                String value = "";
+                int metricType = rs.getInt("metricType"); // Assuming the 
column name is metricType
+
+                if (metricType == CommonConstants.TYPE_NUMBER) {
+                    Double douValue = rs.getDouble("dou");
+                    if (douValue != null && !rs.wasNull()) {
+                        value = BigDecimal.valueOf(douValue)
+                                .setScale(4, RoundingMode.HALF_UP)
+                                .stripTrailingZeros()
+                                .toPlainString();
+                    }
+                } else {
+                    String strValue = rs.getString("str");
+                    value = strValue != null ? strValue : "";
+                }
+
+                String instanceValue = rs.getString("instance");
+                instanceValue = instanceValue != null ? instanceValue : "";
+
+                List<Value> valueList = 
instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
+                valueList.add(new Value(value, rs.getLong("time")));
+            }
+
+        } catch (SQLException e) {
+            log.error("Error querying history metric data: ", e);
+        } finally {
+            // Close resources in reverse order
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    log.error("Error closing ResultSet: ", e);
+                }
+            }
+            if (pstmt != null) {
+                try {
+                    pstmt.close();
+                } catch (SQLException e) {
+                    log.error("Error closing PreparedStatement: ", e);
+                }
+            }
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    log.error("Error closing Connection: ", e);
+                }
+            }
+        }
+
+        return instanceValuesMap;
     }
 
-    /**
-     * save metrics data
-     *
-     * @param metricsData metrics data
-     */
     @Override
-    public void saveData(CollectRep.MetricsData metricsData) {
-
+    public Map<String, List<Value>> getHistoryIntervalMetricData(Long 
monitorId, String app, String metrics, String metric, String label, String 
history) {
+        // Placeholder implementation, adjust based on requirements
+        return new HashMap<>(8);
     }
 
     @Override
     public void destroy() throws Exception {
+    }
 
+    private String formatStringValue(String value) {
+        return SQL_SPECIAL_STRING_PATTERN.matcher(value).replaceAll("\\\\$0");
     }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to