Copilot commented on code in PR #3887:
URL: https://github.com/apache/hertzbeat/pull/3887#discussion_r2584481978
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java:
##########
@@ -19,111 +19,223 @@
package org.apache.hertzbeat.warehouse.db;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.MediaType;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.beans.PropertyDescriptor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL
+ * query executor for GreptimeDB SQL via JDBC
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
- private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
+ private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
+ private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
+
+ private final JdbcTemplate jdbcTemplate;
+ private final HikariDataSource dataSource;
+
+ @Autowired
+ public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
+ super(null, null); // No longer using RestTemplate or HttpSqlProperties
+
+ // Initialize JDBC DataSource
+ this.dataSource = new HikariDataSource();
- private final GreptimeProperties greptimeProperties;
+ // Construct JDBC URL: jdbc:postgresql://endpoint/database
+ String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
+ this.dataSource.setJdbcUrl(jdbcUrl);
+ // Fixed driver class name for PostgreSQL protocol
+ this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
+
+ if (greptimeProperties.username() != null) {
+ this.dataSource.setUsername(greptimeProperties.username());
+ }
+ if (greptimeProperties.password() != null) {
+ this.dataSource.setPassword(greptimeProperties.password());
+ }
+ this.dataSource.setMaximumPoolSize(10);
+ this.dataSource.setMinimumIdle(2);
+ this.dataSource.setConnectionTimeout(30000);
+ this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+ log.info("Initialized GreptimeDB JDBC connection to {}", jdbcUrl);
+ }
+
+ /**
+ * Constructor for compatibility with existing tests.
+ * delegating to the main constructor.
+ * @param greptimeProperties greptime properties
+ * @param restTemplate (unused) rest template
+ */
public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
- super(restTemplate, new
SqlQueryExecutor.HttpSqlProperties(greptimeProperties.httpEndpoint() +
QUERY_PATH,
- greptimeProperties.username(), greptimeProperties.password()));
- this.greptimeProperties = greptimeProperties;
+ this(greptimeProperties);
+ }
+
+ /**
+ * Constructor for testing purposes only.
+ * @param jdbcTemplate Mocked JdbcTemplate
+ */
+ public GreptimeSqlQueryExecutor(JdbcTemplate jdbcTemplate) {
+ super(null, null);
+ this.dataSource = null;
+ this.jdbcTemplate = jdbcTemplate;
}
@Override
- public List<Map<String, Object>> execute(String queryString) {
- List<Map<String, Object>> results = new LinkedList<>();
+ public List<Map<String, Object>> execute(String sql) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
+ return jdbcTemplate.queryForList(sql);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String requestBody = "sql=" + queryString;
- HttpEntity<String> httpEntity = new HttpEntity<>(requestBody,
headers);
+ public int delete(String sql, Object... args) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
+ try {
+ Integer result = jdbcTemplate.execute(sql,
(PreparedStatementCallback<Integer>) ps -> {
+ if (args != null && args.length > 0) {
+ ArgumentPreparedStatementSetter setter = new
ArgumentPreparedStatementSetter(args);
+ setter.setValues(ps);
+ }
+ boolean hasResultSet = ps.execute();
+ int updateCount = ps.getUpdateCount();
+ if (updateCount == -1 && hasResultSet) {
+ log.warn("GreptimeDB returned a ResultSet for DELETE
operation. Ignoring protocol error and assuming success.");
+ return 0;
+ }
+ return updateCount;
+ });
+ return result != null ? result : 0;
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String url = greptimeProperties.httpEndpoint() + QUERY_PATH;
- if (StringUtils.hasText(greptimeProperties.database())) {
- url += "?db=" + greptimeProperties.database();
- }
+ /**
+ * Execute SQL query with arguments (Prepared Statement)
+ * @param sql SQL query with ? placeholders
+ * @param args Arguments for placeholders
+ * @return List of rows
+ */
+ public List<LogEntry> query(String sql, Object... args) {
+ log.debug("Executing GreptimeDB SQL: {} with args: {}", sql, args);
+ try {
+ // Use custom RowMapper that extends BeanPropertyRowMapper
+ return jdbcTemplate.query(sql, new GreptimeLogEntryRowMapper(),
args);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- ResponseEntity<GreptimeSqlQueryContent> responseEntity =
restTemplate.exchange(url,
- HttpMethod.POST, httpEntity,
GreptimeSqlQueryContent.class);
-
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- GreptimeSqlQueryContent responseBody =
responseEntity.getBody();
- if (responseBody != null && responseBody.getCode() == 0
- && responseBody.getOutput() != null &&
!responseBody.getOutput().isEmpty()) {
-
- for (GreptimeSqlQueryContent.Output output :
responseBody.getOutput()) {
- if (output.getRecords() != null &&
output.getRecords().getRows() != null) {
- GreptimeSqlQueryContent.Output.Records.Schema
schema = output.getRecords().getSchema();
- List<List<Object>> rows =
output.getRecords().getRows();
-
- for (List<Object> row : rows) {
- Map<String, Object> rowMap = new HashMap<>();
- if (schema != null &&
schema.getColumnSchemas() != null) {
- for (int i = 0; i <
Math.min(schema.getColumnSchemas().size(), row.size()); i++) {
- String columnName =
schema.getColumnSchemas().get(i).getName();
- Object value = row.get(i);
- rowMap.put(columnName, value);
- }
- } else {
- for (int i = 0; i < row.size(); i++) {
- rowMap.put("col_" + i, row.get(i));
- }
- }
- results.add(rowMap);
- }
- }
- }
- }
- } else {
- log.error("query metrics data from greptime failed. {}",
responseEntity);
- }
+ /**
+ * Execute count SQ
+ * @param sql SQL
+ * @return count
Review Comment:
The JavaDoc comment has a typo and is incomplete: "Execute count SQ" should
be "Execute count SQL query". The comment should also describe the parameters
and what the method does more clearly.
```suggestion
* Execute a count SQL query and return the result.
* @param sql the SQL query string, typically a SELECT COUNT(*)
statement with optional ? placeholders
* @param args arguments to fill the placeholders in the SQL query
* @return the count result as a Long, or throws an exception if the
query fails
```
##########
web-app/src/assets/i18n/zh-CN.json:
##########
@@ -685,6 +685,8 @@
"log.stream.severity-number-placeholder": "输入日志级别编号",
"log.stream.severity-text": "日志级别:",
"log.stream.severity-text-placeholder": "输入日志级别",
+ "log.stream.content" : "日志内容",
+ "log.stream.content-placeholder" : "输入日志内容",
Review Comment:
There are extra spaces around the colon in the JSON key-value pairs (` : `
instead of `: `). While this is valid JSON, it's inconsistent with the rest of
the file. For consistency, these should match the formatting of other entries.
```suggestion
"log.stream.content": "日志内容",
"log.stream.content-placeholder": "输入日志内容",
```
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java:
##########
@@ -507,7 +509,7 @@ public void saveLogData(LogEntry logEntry) {
logEntry.getObservedTimeUnixNano() != null ?
logEntry.getObservedTimeUnixNano() : System.nanoTime(),
logEntry.getSeverityNumber(),
logEntry.getSeverityText(),
- JsonUtil.toJson(logEntry.getBody()),
+ logEntry.getBody(),
Review Comment:
The `body` field data type has been changed from `DataType.Json` to
`DataType.String`, but at line 512, `logEntry.getBody()` is being stored
directly without conversion. The `LogEntry.getBody()` method returns an
`Object` which could be a Map, List, or other complex type. This will cause a
type mismatch error when writing to GreptimeDB. The code should either: 1) Keep
`DataType.Json` and convert with `JsonUtil.toJson(logEntry.getBody())`, or 2)
Keep `DataType.String` but ensure `logEntry.getBody()` is converted to a String
(e.g., using `String.valueOf()` or `JsonUtil.toJson()` if it's a complex
object).
```suggestion
JsonUtil.toJson(logEntry.getBody()),
```
##########
script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml:
##########
@@ -149,6 +149,7 @@ warehouse:
enabled: true
grpc-endpoints: greptime:4001
http-endpoint: http://greptime:4000
+ postgres-endpoint: localhost:4003
Review Comment:
The `postgres-endpoint` is set to `localhost:4003`, but in a Docker Compose
environment, services should reference each other by service name, not
localhost. This should be `greptime:4003` to match the pattern used for
`grpc-endpoints` (line 150) and `http-endpoint` (line 151).
```suggestion
postgres-endpoint: greptime:4003
```
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java:
##########
@@ -19,111 +19,223 @@
package org.apache.hertzbeat.warehouse.db;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.MediaType;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.beans.PropertyDescriptor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL
+ * query executor for GreptimeDB SQL via JDBC
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
- private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
+ private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
+ private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
+
+ private final JdbcTemplate jdbcTemplate;
+ private final HikariDataSource dataSource;
+
+ @Autowired
+ public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
+ super(null, null); // No longer using RestTemplate or HttpSqlProperties
+
+ // Initialize JDBC DataSource
+ this.dataSource = new HikariDataSource();
- private final GreptimeProperties greptimeProperties;
+ // Construct JDBC URL: jdbc:postgresql://endpoint/database
+ String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
+ this.dataSource.setJdbcUrl(jdbcUrl);
+ // Fixed driver class name for PostgreSQL protocol
+ this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
+
+ if (greptimeProperties.username() != null) {
+ this.dataSource.setUsername(greptimeProperties.username());
+ }
+ if (greptimeProperties.password() != null) {
+ this.dataSource.setPassword(greptimeProperties.password());
+ }
+ this.dataSource.setMaximumPoolSize(10);
+ this.dataSource.setMinimumIdle(2);
+ this.dataSource.setConnectionTimeout(30000);
Review Comment:
The HikariCP connection pool configuration uses hardcoded values
(maxPoolSize=10, minIdle=2, connectionTimeout=30000). These should be
configurable via the `GreptimeProperties` record to allow users to tune the
connection pool based on their workload requirements. Consider adding fields
like `maxPoolSize`, `minIdle`, and `connectionTimeout` to the properties
configuration.
```suggestion
this.dataSource.setMaximumPoolSize(
greptimeProperties.maxPoolSize() != null ?
greptimeProperties.maxPoolSize() : 10
);
this.dataSource.setMinimumIdle(
greptimeProperties.minIdle() != null ?
greptimeProperties.minIdle() : 2
);
this.dataSource.setConnectionTimeout(
greptimeProperties.connectionTimeout() != null ?
greptimeProperties.connectionTimeout() : 30000
);
```
##########
hertzbeat-warehouse/pom.xml:
##########
@@ -61,13 +57,16 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
- <!-- taos-jdbc driver -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jdbc</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
Review Comment:
The `GreptimeSqlQueryExecutor` class uses `HikariDataSource` but there's no
explicit HikariCP dependency in the pom.xml. While `spring-boot-starter-jdbc`
includes HikariCP by default, this implicit dependency should be explicitly
declared for clarity and to prevent issues if Spring Boot changes its default
JDBC connection pool implementation in future versions. Add
`<dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency>`
to make this dependency explicit.
```suggestion
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
```
##########
hertzbeat-log/src/main/java/org/apache/hertzbeat/log/notice/LogSseFilterCriteria.java:
##########
@@ -49,7 +49,13 @@ public class LogSseFilterCriteria {
*/
@Schema(description = "The severity text (also known as log level).",
example = "INFO", accessMode = READ_WRITE)
private String severityText;
-
+
+ /**
+ * Log content text filtering (case-insensitive contains match).
+ */
+ @Schema(description = "Log content text filtering", example = "error
occurred", accessMode = READ_WRITE)
+ private String logContent;
Review Comment:
The field name `logContent` is inconsistent with the regular query API
endpoints in `LogQueryController` which use the parameter name `search` (line
80). This inconsistency can be confusing for API consumers. Consider renaming
this field to `search` to maintain consistency across all log query APIs, or at
least document this difference clearly.
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java:
##########
@@ -806,7 +706,7 @@ private void doSaveLogBatch(List<LogEntry> logEntries) {
logEntry.getObservedTimeUnixNano() != null ?
logEntry.getObservedTimeUnixNano() : System.nanoTime(),
logEntry.getSeverityNumber(),
logEntry.getSeverityText(),
- JsonUtil.toJson(logEntry.getBody()),
+ logEntry.getBody(),
Review Comment:
Same issue as in `saveLogData()`: the `body` field is declared as
`DataType.String`, but at line 709, `logEntry.getBody()` is being stored
directly without conversion. This will cause a type mismatch error when the
body is a complex object (Map, List, etc.). The code should convert the body to
String, similar to how attributes, resource, and instrumentation_scope are
handled with `JsonUtil.toJson()`.
```suggestion
JsonUtil.toJson(logEntry.getBody()),
```
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java:
##########
@@ -19,111 +19,223 @@
package org.apache.hertzbeat.warehouse.db;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.MediaType;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.beans.PropertyDescriptor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL
+ * query executor for GreptimeDB SQL via JDBC
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
- private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
+ private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
+ private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
+
+ private final JdbcTemplate jdbcTemplate;
+ private final HikariDataSource dataSource;
+
+ @Autowired
+ public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
+ super(null, null); // No longer using RestTemplate or HttpSqlProperties
+
+ // Initialize JDBC DataSource
+ this.dataSource = new HikariDataSource();
- private final GreptimeProperties greptimeProperties;
+ // Construct JDBC URL: jdbc:postgresql://endpoint/database
+ String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
+ this.dataSource.setJdbcUrl(jdbcUrl);
+ // Fixed driver class name for PostgreSQL protocol
+ this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
+
+ if (greptimeProperties.username() != null) {
+ this.dataSource.setUsername(greptimeProperties.username());
+ }
+ if (greptimeProperties.password() != null) {
+ this.dataSource.setPassword(greptimeProperties.password());
+ }
+ this.dataSource.setMaximumPoolSize(10);
+ this.dataSource.setMinimumIdle(2);
+ this.dataSource.setConnectionTimeout(30000);
+ this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+ log.info("Initialized GreptimeDB JDBC connection to {}", jdbcUrl);
+ }
+
+ /**
+ * Constructor for compatibility with existing tests.
+ * delegating to the main constructor.
+ * @param greptimeProperties greptime properties
+ * @param restTemplate (unused) rest template
+ */
public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
- super(restTemplate, new
SqlQueryExecutor.HttpSqlProperties(greptimeProperties.httpEndpoint() +
QUERY_PATH,
- greptimeProperties.username(), greptimeProperties.password()));
- this.greptimeProperties = greptimeProperties;
+ this(greptimeProperties);
+ }
+
+ /**
+ * Constructor for testing purposes only.
+ * @param jdbcTemplate Mocked JdbcTemplate
+ */
+ public GreptimeSqlQueryExecutor(JdbcTemplate jdbcTemplate) {
+ super(null, null);
+ this.dataSource = null;
+ this.jdbcTemplate = jdbcTemplate;
}
@Override
- public List<Map<String, Object>> execute(String queryString) {
- List<Map<String, Object>> results = new LinkedList<>();
+ public List<Map<String, Object>> execute(String sql) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
+ return jdbcTemplate.queryForList(sql);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String requestBody = "sql=" + queryString;
- HttpEntity<String> httpEntity = new HttpEntity<>(requestBody,
headers);
+ public int delete(String sql, Object... args) {
Review Comment:
[nitpick] The `delete` method returns an int, but the calling code in
`GreptimeDbDataStorage.batchDeleteLogs()` expects a boolean result. However,
the method is named `delete` but the caller uses it and converts to boolean,
which works. Consider renaming this method to `executeUpdate` or similar to
better reflect that it can be used for any DML operation (DELETE, UPDATE,
INSERT) that returns an update count.
##########
hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java:
##########
@@ -19,111 +19,223 @@
package org.apache.hertzbeat.warehouse.db;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.MediaType;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
+import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.beans.PropertyDescriptor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL
+ * query executor for GreptimeDB SQL via JDBC
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
- private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
+ private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
+ private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
+
+ private final JdbcTemplate jdbcTemplate;
+ private final HikariDataSource dataSource;
+
+ @Autowired
+ public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
+ super(null, null); // No longer using RestTemplate or HttpSqlProperties
+
+ // Initialize JDBC DataSource
+ this.dataSource = new HikariDataSource();
- private final GreptimeProperties greptimeProperties;
+ // Construct JDBC URL: jdbc:postgresql://endpoint/database
+ String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
+ this.dataSource.setJdbcUrl(jdbcUrl);
+ // Fixed driver class name for PostgreSQL protocol
+ this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
+
+ if (greptimeProperties.username() != null) {
+ this.dataSource.setUsername(greptimeProperties.username());
+ }
+ if (greptimeProperties.password() != null) {
+ this.dataSource.setPassword(greptimeProperties.password());
+ }
+ this.dataSource.setMaximumPoolSize(10);
+ this.dataSource.setMinimumIdle(2);
+ this.dataSource.setConnectionTimeout(30000);
+ this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+ log.info("Initialized GreptimeDB JDBC connection to {}", jdbcUrl);
+ }
+
+ /**
+ * Constructor for compatibility with existing tests.
+ * delegating to the main constructor.
+ * @param greptimeProperties greptime properties
+ * @param restTemplate (unused) rest template
+ */
public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
- super(restTemplate, new
SqlQueryExecutor.HttpSqlProperties(greptimeProperties.httpEndpoint() +
QUERY_PATH,
- greptimeProperties.username(), greptimeProperties.password()));
- this.greptimeProperties = greptimeProperties;
+ this(greptimeProperties);
+ }
+
+ /**
+ * Constructor for testing purposes only.
+ * @param jdbcTemplate Mocked JdbcTemplate
+ */
+ public GreptimeSqlQueryExecutor(JdbcTemplate jdbcTemplate) {
+ super(null, null);
+ this.dataSource = null;
+ this.jdbcTemplate = jdbcTemplate;
}
@Override
- public List<Map<String, Object>> execute(String queryString) {
- List<Map<String, Object>> results = new LinkedList<>();
+ public List<Map<String, Object>> execute(String sql) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
+ return jdbcTemplate.queryForList(sql);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String requestBody = "sql=" + queryString;
- HttpEntity<String> httpEntity = new HttpEntity<>(requestBody,
headers);
+ public int delete(String sql, Object... args) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
+ try {
+ Integer result = jdbcTemplate.execute(sql,
(PreparedStatementCallback<Integer>) ps -> {
+ if (args != null && args.length > 0) {
+ ArgumentPreparedStatementSetter setter = new
ArgumentPreparedStatementSetter(args);
+ setter.setValues(ps);
+ }
+ boolean hasResultSet = ps.execute();
+ int updateCount = ps.getUpdateCount();
+ if (updateCount == -1 && hasResultSet) {
+ log.warn("GreptimeDB returned a ResultSet for DELETE
operation. Ignoring protocol error and assuming success.");
+ return 0;
+ }
+ return updateCount;
+ });
+ return result != null ? result : 0;
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String url = greptimeProperties.httpEndpoint() + QUERY_PATH;
- if (StringUtils.hasText(greptimeProperties.database())) {
- url += "?db=" + greptimeProperties.database();
- }
+ /**
+ * Execute SQL query with arguments (Prepared Statement)
+ * @param sql SQL query with ? placeholders
+ * @param args Arguments for placeholders
+ * @return List of rows
+ */
+ public List<LogEntry> query(String sql, Object... args) {
+ log.debug("Executing GreptimeDB SQL: {} with args: {}", sql, args);
+ try {
+ // Use custom RowMapper that extends BeanPropertyRowMapper
+ return jdbcTemplate.query(sql, new GreptimeLogEntryRowMapper(),
args);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- ResponseEntity<GreptimeSqlQueryContent> responseEntity =
restTemplate.exchange(url,
- HttpMethod.POST, httpEntity,
GreptimeSqlQueryContent.class);
-
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- GreptimeSqlQueryContent responseBody =
responseEntity.getBody();
- if (responseBody != null && responseBody.getCode() == 0
- && responseBody.getOutput() != null &&
!responseBody.getOutput().isEmpty()) {
-
- for (GreptimeSqlQueryContent.Output output :
responseBody.getOutput()) {
- if (output.getRecords() != null &&
output.getRecords().getRows() != null) {
- GreptimeSqlQueryContent.Output.Records.Schema
schema = output.getRecords().getSchema();
- List<List<Object>> rows =
output.getRecords().getRows();
-
- for (List<Object> row : rows) {
- Map<String, Object> rowMap = new HashMap<>();
- if (schema != null &&
schema.getColumnSchemas() != null) {
- for (int i = 0; i <
Math.min(schema.getColumnSchemas().size(), row.size()); i++) {
- String columnName =
schema.getColumnSchemas().get(i).getName();
- Object value = row.get(i);
- rowMap.put(columnName, value);
- }
- } else {
- for (int i = 0; i < row.size(); i++) {
- rowMap.put("col_" + i, row.get(i));
- }
- }
- results.add(rowMap);
- }
- }
- }
- }
- } else {
- log.error("query metrics data from greptime failed. {}",
responseEntity);
- }
+ /**
+ * Execute count SQ
+ * @param sql SQL
+ * @return count
+ */
+ public Long count(String sql, Object... args) {
+ try {
+ return jdbcTemplate.queryForObject(sql, Long.class, args);
} catch (Exception e) {
- log.error("query metrics data from greptime error. {}",
e.getMessage(), e);
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
}
- return results;
}
@Override
public String getDatasource() {
return DATASOURCE;
}
-}
+
+ // Ensure to close the datasource when the bean is destroyed
+ public void close() {
+ if (this.dataSource != null && !this.dataSource.isClosed()) {
+ this.dataSource.close();
+ }
+ }
Review Comment:
The `close()` method should be annotated with `@PreDestroy` from
`javax.annotation` (or `jakarta.annotation` for Jakarta EE) to ensure it's
automatically called when the Spring bean is destroyed. Without this
annotation, the datasource may not be properly closed on application shutdown.
##########
web-app/src/assets/i18n/en-US.json:
##########
@@ -682,6 +682,8 @@
"log.stream.severity-number-placeholder": "Enter Severity Number",
"log.stream.severity-text": "Severity:",
"log.stream.severity-text-placeholder": "Enter Severity",
+ "log.stream.content" : "Log Content",
+ "log.stream.content-placeholder" : "Enter Log Content",
Review Comment:
There are extra spaces around the colon in the JSON key-value pairs (` : `
instead of `: `). While this is valid JSON, it's inconsistent with the rest of
the file. For consistency, these should match the formatting of other entries
(e.g., line 682-684).
```suggestion
"log.stream.content": "Log Content",
"log.stream.content-placeholder": "Enter Log Content",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]