This is an automated email from the ASF dual-hosted git repository.
cyyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 7f4c2e0487 [refractor] Using Spring jdbc to query Greptime log (#3880)
7f4c2e0487 is described below
commit 7f4c2e0487155bc7856b221395646e4c055a9d3f
Author: Logic <[email protected]>
AuthorDate: Wed Dec 3 12:11:08 2025 +0800
[refractor] Using Spring jdbc to query Greptime log (#3880)
Signed-off-by: Yang Chen <[email protected]>
Co-authored-by: Yang Chen <[email protected]>
---
.../log/alert/LogPeriodicAlertE2eTest.java | 15 +-
.../log/storage/GreptimeLogStorageE2eTest.java | 23 +-
.../log/controller/LogQueryController.java | 161 +++++------
.../log/controller/LogQueryControllerTest.java | 111 ++++----
.../src/main/resources/application-test.yml | 1 +
.../src/main/resources/application.yml | 1 +
hertzbeat-warehouse/pom.xml | 25 +-
.../warehouse/db/GreptimeSqlQueryExecutor.java | 234 +++++++++++-----
.../store/history/tsdb/HistoryDataReader.java | 34 ++-
.../tsdb/greptime/GreptimeDbDataStorage.java | 294 ++++++---------------
.../history/tsdb/greptime/GreptimeProperties.java | 21 +-
.../warehouse/db/GreptimeSqlQueryExecutorTest.java | 107 +++-----
.../tsdb/greptime/GreptimeDbDataStorageTest.java | 96 +++----
script/application.yml | 1 +
.../conf/application.yml | 1 +
.../log/log-manage/log-manage.component.html | 14 +-
.../routes/log/log-manage/log-manage.component.ts | 10 +-
web-app/src/app/service/log.service.ts | 14 +-
18 files changed, 574 insertions(+), 589 deletions(-)
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
index e3c387dbe6..8628d5b72f 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -70,6 +70,7 @@ public class LogPeriodicAlertE2eTest {
private static final String GREPTIME_IMAGE = "greptime/greptimedb:latest";
private static final int GREPTIME_HTTP_PORT = 4000;
private static final int GREPTIME_GRPC_PORT = 4001;
+ private static final int GREPTIME_PG_PORT = 4003;
private static final Duration CONTAINER_STARTUP_TIMEOUT =
Duration.ofSeconds(120);
@LocalServerPort
@@ -85,16 +86,17 @@ public class LogPeriodicAlertE2eTest {
private AlarmCommonReduce alarmCommonReduce;
static GenericContainer<?> vector;
-
+
static GenericContainer<?> greptimedb;
static {
greptimedb = new
GenericContainer<>(DockerImageName.parse(GREPTIME_IMAGE))
- .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT)
+ .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT,
GREPTIME_PG_PORT)
.withCommand("standalone", "start",
"--http-addr", "0.0.0.0:" + GREPTIME_HTTP_PORT,
- "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT)
- .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT))
+ "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT,
+ "--postgres-addr", "0.0.0.0:" + GREPTIME_PG_PORT)
+ .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT, GREPTIME_PG_PORT))
.withStartupTimeout(CONTAINER_STARTUP_TIMEOUT);
greptimedb.start();
}
@@ -106,6 +108,7 @@ public class LogPeriodicAlertE2eTest {
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
+ r.add("warehouse.store.greptime.postgres-endpoint", () -> "localhost:"
+ greptimedb.getMappedPort(GREPTIME_PG_PORT));
r.add("warehouse.store.greptime.username", () -> "");
r.add("warehouse.store.greptime.password", () -> "");
}
@@ -227,4 +230,4 @@ public class LogPeriodicAlertE2eTest {
errorLabelsByIndividual.put("type", "error_count");
errorCountAlertByIndividual.setLabels(errorLabelsByIndividual);
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
index 4aba2f1eb2..f491d8562f 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -59,6 +59,7 @@ public class GreptimeLogStorageE2eTest {
private static final String GREPTIME_IMAGE = "greptime/greptimedb:latest";
private static final int GREPTIME_HTTP_PORT = 4000;
private static final int GREPTIME_GRPC_PORT = 4001;
+ private static final int GREPTIME_PG_PORT = 4003;
private static final Duration CONTAINER_STARTUP_TIMEOUT =
Duration.ofSeconds(120);
@LocalServerPort
@@ -75,11 +76,12 @@ public class GreptimeLogStorageE2eTest {
static {
greptimedb = new
GenericContainer<>(DockerImageName.parse(GREPTIME_IMAGE))
- .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT)
+ .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT,
GREPTIME_PG_PORT)
.withCommand("standalone", "start",
"--http-addr", "0.0.0.0:" + GREPTIME_HTTP_PORT,
- "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT)
- .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT))
+ "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT,
+ "--postgres-addr", "0.0.0.0:" + GREPTIME_PG_PORT)
+ .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT, GREPTIME_PG_PORT))
.withStartupTimeout(CONTAINER_STARTUP_TIMEOUT);
greptimedb.start();
}
@@ -90,6 +92,7 @@ public class GreptimeLogStorageE2eTest {
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
+ r.add("warehouse.store.greptime.postgres-endpoint", () -> "localhost:"
+ greptimedb.getMappedPort(GREPTIME_PG_PORT));
r.add("warehouse.store.greptime.username", () -> "");
r.add("warehouse.store.greptime.password", () -> "");
}
@@ -116,7 +119,7 @@ public class GreptimeLogStorageE2eTest {
void testLogStorageToGreptimeDb() {
List<LogEntry> capturedLogs = new ArrayList<>();
-
+
// Wait for Vector to generate and send logs to HertzBeat
await().atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(3))
@@ -131,7 +134,7 @@ public class GreptimeLogStorageE2eTest {
Thread.currentThread().interrupt();
throw new RuntimeException("Test interrupted", e);
}
-
+
// Assert that we have captured at least some logs
assertFalse(capturedLogs.isEmpty(), "Should have captured
at least one log entry");
});
@@ -142,7 +145,7 @@ public class GreptimeLogStorageE2eTest {
assertNotNull(firstLog, "First log should not be null");
assertNotNull(firstLog.getBody(), "Log body should not be null");
assertNotNull(firstLog.getSeverityText(), "Severity text should not be
null");
-
+
// Additional wait to ensure logs are persisted to GreptimeDB
await().atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
@@ -159,8 +162,8 @@ public class GreptimeLogStorageE2eTest {
private List<LogEntry> queryStoredLogs() {
long endTime = System.currentTimeMillis();
long startTime = endTime - Duration.ofMinutes(5).toMillis(); // Look
back 5 minutes
-
+
return greptimeDbDataStorage.queryLogsByMultipleConditions(
- startTime, endTime, null, null, null, null);
+ startTime, endTime, null, null, null, null, null);
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
index 2de4de5ab2..5d0bed1f45 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
@@ -61,50 +61,54 @@ public class LogQueryController {
}
@GetMapping("/list")
- @Operation(summary = "Query logs by time range with optional filters",
- description = "Query logs by [start,end] in ms and optional
filters with pagination. Returns paginated log entries sorted by timestamp in
descending order.")
+ @Operation(summary = "Query logs by time range with optional filters",
+ description = "Query logs by [start,end] in ms and optional
filters with pagination. Returns paginated log entries sorted by timestamp in
descending order.")
public ResponseEntity<Message<Page<LogEntry>>> list(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
@RequestParam(value = "severityText", required = false) String
severityText,
- @Parameter(description = "Page index starting from 0", example =
"0")
+ @Parameter(description = "Log content search keyword", example =
"error")
+ @RequestParam(value = "search", required = false) String search,
+ @Parameter(description = "Page index starting from 0", example =
"0")
@RequestParam(value = "pageIndex", required = false, defaultValue
= "0") Integer pageIndex,
- @Parameter(description = "Number of items per page", example =
"20")
+ @Parameter(description = "Number of items per page", example =
"20")
@RequestParam(value = "pageSize", required = false, defaultValue =
"20") Integer pageSize) {
- Page<LogEntry> result = getPagedLogs(start, end, traceId, spanId,
severityNumber, severityText, pageIndex, pageSize);
+ Page<LogEntry> result = getPagedLogs(start, end, traceId, spanId,
severityNumber, severityText, search, pageIndex, pageSize);
return ResponseEntity.ok(Message.success(result));
}
@GetMapping("/stats/overview")
- @Operation(summary = "Log overview statistics",
- description = "Overall counts and basic statistics with
filters. Provides counts by severity levels according to OpenTelemetry
standard.")
+ @Operation(summary = "Log overview statistics",
+ description = "Overall counts and basic statistics with filters.
Provides counts by severity levels according to OpenTelemetry standard.")
public ResponseEntity<Message<Map<String, Object>>> overviewStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText,
+ @Parameter(description = "Log content search keyword", example =
"error")
+ @RequestParam(value = "search", required = false) String search) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
+
Map<String, Object> overview = new HashMap<>();
overview.put("totalCount", logs.size());
-
+
// Count by severity levels according to OpenTelemetry standard
// TRACE: 1-4, DEBUG: 5-8, INFO: 9-12, WARN: 13-16, ERROR: 17-20,
FATAL: 21-24
long fatalCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 21 && log.getSeverityNumber() <=
24).count();
@@ -113,114 +117,117 @@ public class LogQueryController {
long infoCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 9 && log.getSeverityNumber() <=
12).count();
long debugCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 5 && log.getSeverityNumber() <=
8).count();
long traceCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 1 && log.getSeverityNumber() <=
4).count();
-
+
overview.put("fatalCount", fatalCount);
overview.put("errorCount", errorCount);
overview.put("warnCount", warnCount);
overview.put("infoCount", infoCount);
overview.put("debugCount", debugCount);
overview.put("traceCount", traceCount);
-
+
return ResponseEntity.ok(Message.success(overview));
}
@GetMapping("/stats/trace-coverage")
- @Operation(summary = "Trace coverage statistics",
- description = "Statistics about trace information availability.
Shows how many logs have trace IDs, span IDs, or both for distributed tracing
analysis.")
+ @Operation(summary = "Trace coverage statistics",
+ description = "Statistics about trace information availability.
Shows how many logs have trace IDs, span IDs, or both for distributed tracing
analysis.")
public ResponseEntity<Message<Map<String, Object>>> traceCoverageStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText,
+ @Parameter(description = "Log content search keyword", example =
"error")
+ @RequestParam(value = "search", required = false) String search) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
+
Map<String, Object> result = new HashMap<>();
-
+
// Trace coverage statistics
long withTraceId = logs.stream().filter(log -> log.getTraceId() !=
null && !log.getTraceId().isEmpty()).count();
long withSpanId = logs.stream().filter(log -> log.getSpanId() != null
&& !log.getSpanId().isEmpty()).count();
- long withBothTraceAndSpan = logs.stream().filter(log ->
- log.getTraceId() != null && !log.getTraceId().isEmpty()
- && log.getSpanId() != null &&
!log.getSpanId().isEmpty()).count();
+ long withBothTraceAndSpan = logs.stream().filter(log ->
+ log.getTraceId() != null && !log.getTraceId().isEmpty()
+ && log.getSpanId() != null &&
!log.getSpanId().isEmpty()).count();
long withoutTrace = logs.size() - withTraceId;
-
+
Map<String, Long> traceCoverage = new HashMap<>();
traceCoverage.put("withTrace", withTraceId);
traceCoverage.put("withoutTrace", withoutTrace);
traceCoverage.put("withSpan", withSpanId);
traceCoverage.put("withBothTraceAndSpan", withBothTraceAndSpan);
-
+
result.put("traceCoverage", traceCoverage);
return ResponseEntity.ok(Message.success(result));
}
@GetMapping("/stats/trend")
- @Operation(summary = "Log trend over time",
- description = "Count logs by hour intervals with filters.
Groups logs by hour and provides time-series data for trend analysis.")
+ @Operation(summary = "Log trend over time",
+ description = "Count logs by hour intervals with filters. Groups
logs by hour and provides time-series data for trend analysis.")
public ResponseEntity<Message<Map<String, Object>>> trendStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText,
+ @Parameter(description = "Log content search keyword", example =
"error")
+ @RequestParam(value = "search", required = false) String search) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
+
// Group by hour
Map<String, Long> hourlyStats = logs.stream()
- .filter(log -> log.getTimeUnixNano() != null)
- .collect(Collectors.groupingBy(
- log -> {
- long timestampMs = log.getTimeUnixNano() / 1_000_000L;
- LocalDateTime dateTime = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(timestampMs),
- ZoneId.systemDefault());
- return
dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00"));
- },
- Collectors.counting()));
-
+ .filter(log -> log.getTimeUnixNano() != null)
+ .collect(Collectors.groupingBy(
+ log -> {
+ long timestampMs = log.getTimeUnixNano() /
1_000_000L;
+ LocalDateTime dateTime = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(timestampMs),
+ ZoneId.systemDefault());
+ return
dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00"));
+ },
+ Collectors.counting()));
+
Map<String, Object> result = new HashMap<>();
result.put("hourlyStats", hourlyStats);
return ResponseEntity.ok(Message.success(result));
}
- private List<LogEntry> getFilteredLogs(Long start, Long end, String
traceId, String spanId,
- Integer severityNumber, String
severityText) {
+ private List<LogEntry> getFilteredLogs(Long start, Long end, String
traceId, String spanId,
+ Integer severityNumber, String
severityText, String search) {
// Use the new multi-condition query method
- return historyDataReader.queryLogsByMultipleConditions(start, end,
traceId, spanId, severityNumber, severityText);
+ return historyDataReader.queryLogsByMultipleConditions(start, end,
traceId, spanId, severityNumber, severityText, search);
}
- private Page<LogEntry> getPagedLogs(Long start, Long end, String traceId,
String spanId,
- Integer severityNumber, String
severityText, Integer pageIndex, Integer pageSize) {
+ private Page<LogEntry> getPagedLogs(Long start, Long end, String traceId,
String spanId,
+ Integer severityNumber, String
severityText, String search,
+ Integer pageIndex, Integer pageSize) {
// Calculate pagination parameters
int offset = pageIndex * pageSize;
-
+
// Get total count and paginated data
- long totalElements =
historyDataReader.countLogsByMultipleConditions(start, end, traceId, spanId,
severityNumber, severityText);
+ long totalElements =
historyDataReader.countLogsByMultipleConditions(start, end, traceId, spanId,
severityNumber, severityText, search);
List<LogEntry> pagedLogs =
historyDataReader.queryLogsByMultipleConditionsWithPagination(
- start, end, traceId, spanId, severityNumber, severityText, offset,
pageSize);
-
+ start, end, traceId, spanId, severityNumber, severityText,
search, offset, pageSize);
+
// Create PageRequest (sorted by timestamp descending)
Sort sort = Sort.by(Sort.Direction.DESC, "timeUnixNano");
PageRequest pageRequest = PageRequest.of(pageIndex, pageSize, sort);
-
+
// Return Spring Data Page object
return new PageImpl<>(pagedLogs, pageRequest, totalElements);
}
-}
-
-
+}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
index 96173c2a24..3c1afb932a 100644
---
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
+++
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -87,24 +87,26 @@ class LogQueryControllerTest {
List<LogEntry> mockLogs = Arrays.asList(logEntry1, logEntry2);
- when(historyDataReader.countLogsByMultipleConditions(anyLong(),
anyLong(), anyString(),
- anyString(), anyInt(), anyString())).thenReturn(2L);
-
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(anyLong(),
anyLong(),
- anyString(), anyString(), anyInt(), anyString(), anyInt(),
anyInt()))
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.countLogsByMultipleConditions(anyLong(),
anyLong(), anyString(),
+ anyString(), anyInt(), anyString(), isNull())).thenReturn(2L);
+ // Fixed: Added searchContent param matcher (7th arg)
+
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(anyLong(),
anyLong(),
+ anyString(), anyString(), anyInt(), anyString(), isNull(),
anyInt(), anyInt()))
.thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/list")
- .param("start", "1734005477000")
- .param("end", "1734005478000")
- .param("traceId", "trace123")
- .param("spanId", "span456")
- .param("severityNumber", "9")
- .param("severityText", "INFO")
- .param("pageIndex", "0")
- .param("pageSize", "20")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/list")
+ .param("start", "1734005477000")
+ .param("end", "1734005478000")
+ .param("traceId", "trace123")
+ .param("spanId", "span456")
+ .param("severityNumber", "9")
+ .param("severityText", "INFO")
+ .param("pageIndex", "0")
+ .param("pageSize", "20")
+ )
.andDo(print())
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
@@ -127,16 +129,18 @@ class LogQueryControllerTest {
.build()
);
- when(historyDataReader.countLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull())).thenReturn(1L);
-
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(isNull(),
isNull(),
- isNull(), isNull(), isNull(), isNull(), eq(0), eq(20)))
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.countLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(1L);
+ // Fixed: Added searchContent param matcher (7th arg)
+
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(isNull(),
isNull(),
+ isNull(), isNull(), isNull(), isNull(), isNull(), eq(0),
eq(20)))
.thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/list")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/list")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.content").isArray())
@@ -163,13 +167,14 @@ class LogQueryControllerTest {
LogEntry.builder().severityNumber(21).build()
);
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/overview")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/overview")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.totalCount").value(8))
@@ -188,15 +193,16 @@ class LogQueryControllerTest {
LogEntry.builder().severityNumber(17).build()
);
-
when(historyDataReader.queryLogsByMultipleConditions(eq(1734005477000L),
eq(1734005478000L),
- isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ // Fixed: Added searchContent param matcher (7th arg)
+
when(historyDataReader.queryLogsByMultipleConditions(eq(1734005477000L),
eq(1734005478000L),
+ isNull(), isNull(), isNull(), isNull(),
isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/overview")
- .param("start", "1734005477000")
- .param("end", "1734005478000")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/overview")
+ .param("start", "1734005477000")
+ .param("end", "1734005478000")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.totalCount").value(2));
@@ -217,13 +223,14 @@ class LogQueryControllerTest {
LogEntry.builder().build() // null values
);
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trace-coverage")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trace-coverage")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.traceCoverage.withTrace").value(3))
@@ -244,13 +251,14 @@ class LogQueryControllerTest {
LogEntry.builder().timeUnixNano(1734009077630000000L).build()
);
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trend")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trend")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.hourlyStats").isMap());
@@ -263,16 +271,17 @@ class LogQueryControllerTest {
LogEntry.builder().timeUnixNano(null).build() // This should
be filtered out
);
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ // Fixed: Added searchContent param matcher (7th arg)
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trend")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trend")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.hourlyStats").isMap());
}
-}
+}
\ No newline at end of file
diff --git a/hertzbeat-startup/src/main/resources/application-test.yml
b/hertzbeat-startup/src/main/resources/application-test.yml
index 4aa13715ff..53bcfa3b00 100644
--- a/hertzbeat-startup/src/main/resources/application-test.yml
+++ b/hertzbeat-startup/src/main/resources/application-test.yml
@@ -91,6 +91,7 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
+ postgres-endpoint: localhost:4003
database: public
username: greptime
password: greptime
diff --git a/hertzbeat-startup/src/main/resources/application.yml
b/hertzbeat-startup/src/main/resources/application.yml
index e141b4606c..8d36940e2c 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -209,6 +209,7 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
+ postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index 95a32a5012..0d4ba07e18 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -29,24 +29,20 @@
<name>${project.artifactId}</name>
<dependencies>
- <!-- common -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-common</artifactId>
<scope>provided</scope>
</dependency>
- <!-- plugin -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-plugin</artifactId>
<scope>provided</scope>
</dependency>
- <!-- util -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
- <!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -61,13 +57,16 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
- <!-- taos-jdbc driver -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jdbc</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>${taos-jdbcdriver.version}</version>
</dependency>
- <!-- IoTDB -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
@@ -83,19 +82,16 @@
</exclusion>
</exclusions>
</dependency>
- <!-- QuestDB -->
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb</artifactId>
<version>${questdb.version}</version>
</dependency>
- <!-- influxdb Here, support for version 1.7, use influxdb-java, Full
support for version 2.x requires influxdb-client-java -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
- <!-- greptimedb -->
<dependency>
<groupId>io.greptime</groupId>
<artifactId>ingester-all</artifactId>
@@ -119,7 +115,6 @@
</exclusion>
</exclusions>
</dependency>
- <!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -130,13 +125,11 @@
</exclusion>
</exclusions>
</dependency>
- <!--redis-->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<scope>provided</scope>
</dependency>
- <!-- swagger -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
@@ -150,11 +143,15 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
index 7e4c64341b..aab903b745 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,111 +19,199 @@
package org.apache.hertzbeat.warehouse.db;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.util.Base64Util;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
+import org.apache.hertzbeat.common.util.JsonUtil;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.MediaType;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.beans.PropertyDescriptor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL
+ * query executor for GreptimeDB SQL via JDBC
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
- private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
+ private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
+ private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
- private final GreptimeProperties greptimeProperties;
+ private final JdbcTemplate jdbcTemplate;
+ private final HikariDataSource dataSource;
+ @Autowired
+ public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
+ super(null, null); // No longer using RestTemplate or HttpSqlProperties
+ // Initialize JDBC DataSource
+ this.dataSource = new HikariDataSource();
+
+ // Construct JDBC URL: jdbc:postgresql://endpoint/database
+ String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
+ this.dataSource.setJdbcUrl(jdbcUrl);
+
+ // Fixed driver class name for PostgreSQL protocol
+ this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
+
+ if (greptimeProperties.username() != null) {
+ this.dataSource.setUsername(greptimeProperties.username());
+ }
+ if (greptimeProperties.password() != null) {
+ this.dataSource.setPassword(greptimeProperties.password());
+ }
+ this.dataSource.setMaximumPoolSize(10);
+ this.dataSource.setMinimumIdle(2);
+ this.dataSource.setConnectionTimeout(30000);
+
+ this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+ log.info("Initialized GreptimeDB JDBC connection to {}", jdbcUrl);
+ }
+
+ /**
+ * Constructor for compatibility with existing tests.
+ * delegating to the main constructor.
+ * @param greptimeProperties greptime properties
+ * @param restTemplate (unused) rest template
+ */
public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
- super(restTemplate, new
SqlQueryExecutor.HttpSqlProperties(greptimeProperties.httpEndpoint() +
QUERY_PATH,
- greptimeProperties.username(), greptimeProperties.password()));
- this.greptimeProperties = greptimeProperties;
+ this(greptimeProperties);
+ }
+
+ /**
+ * Constructor for testing purposes only.
+ * @param jdbcTemplate Mocked JdbcTemplate
+ */
+ public GreptimeSqlQueryExecutor(JdbcTemplate jdbcTemplate) {
+ super(null, null);
+ this.dataSource = null;
+ this.jdbcTemplate = jdbcTemplate;
}
@Override
- public List<Map<String, Object>> execute(String queryString) {
- List<Map<String, Object>> results = new LinkedList<>();
+ public List<Map<String, Object>> execute(String sql) {
+ log.debug("Executing GreptimeDB SQL: {}", sql);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
- }
-
- String requestBody = "sql=" + queryString;
- HttpEntity<String> httpEntity = new HttpEntity<>(requestBody,
headers);
+ return jdbcTemplate.queryForList(sql);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- String url = greptimeProperties.httpEndpoint() + QUERY_PATH;
- if (StringUtils.hasText(greptimeProperties.database())) {
- url += "?db=" + greptimeProperties.database();
- }
+ /**
+ * Execute SQL query with arguments (Prepared Statement)
+ * @param sql SQL query with ? placeholders
+ * @param args Arguments for placeholders
+ * @return List of rows
+ */
+ public List<LogEntry> query(String sql, Object... args) {
+ log.debug("Executing GreptimeDB SQL: {} with args: {}", sql, args);
+ try {
+ // Use custom RowMapper that extends BeanPropertyRowMapper
+ return jdbcTemplate.query(sql, new GreptimeLogEntryRowMapper(),
args);
+ } catch (Exception e) {
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
+ }
+ }
- ResponseEntity<GreptimeSqlQueryContent> responseEntity =
restTemplate.exchange(url,
- HttpMethod.POST, httpEntity,
GreptimeSqlQueryContent.class);
-
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- GreptimeSqlQueryContent responseBody =
responseEntity.getBody();
- if (responseBody != null && responseBody.getCode() == 0
- && responseBody.getOutput() != null &&
!responseBody.getOutput().isEmpty()) {
-
- for (GreptimeSqlQueryContent.Output output :
responseBody.getOutput()) {
- if (output.getRecords() != null &&
output.getRecords().getRows() != null) {
- GreptimeSqlQueryContent.Output.Records.Schema
schema = output.getRecords().getSchema();
- List<List<Object>> rows =
output.getRecords().getRows();
-
- for (List<Object> row : rows) {
- Map<String, Object> rowMap = new HashMap<>();
- if (schema != null &&
schema.getColumnSchemas() != null) {
- for (int i = 0; i <
Math.min(schema.getColumnSchemas().size(), row.size()); i++) {
- String columnName =
schema.getColumnSchemas().get(i).getName();
- Object value = row.get(i);
- rowMap.put(columnName, value);
- }
- } else {
- for (int i = 0; i < row.size(); i++) {
- rowMap.put("col_" + i, row.get(i));
- }
- }
- results.add(rowMap);
- }
- }
- }
- }
- } else {
- log.error("query metrics data from greptime failed. {}",
responseEntity);
- }
+ /**
+ * Execute count SQ
+ * @param sql SQL
+ * @return count
+ */
+ public Long count(String sql, Object... args) {
+ try {
+ return jdbcTemplate.queryForObject(sql, Long.class, args);
} catch (Exception e) {
- log.error("query metrics data from greptime error. {}",
e.getMessage(), e);
+ log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
+ throw e;
}
- return results;
}
@Override
public String getDatasource() {
return DATASOURCE;
}
-}
+
+ // Ensure to close the datasource when the bean is destroyed
+ public void close() {
+ if (this.dataSource != null && !this.dataSource.isClosed()) {
+ this.dataSource.close();
+ }
+ }
+
+ /**
+ * Custom RowMapper that extends BeanPropertyRowMapper to leverage default
mapping
+ * while overriding specific fields that need type conversion (Timestamp
-> Long, JSON String -> Object).
+ */
+ private static class GreptimeLogEntryRowMapper extends
BeanPropertyRowMapper<LogEntry> {
+
+ public GreptimeLogEntryRowMapper() {
+ super(LogEntry.class);
+ }
+
+ @Override
+ protected Object getColumnValue(ResultSet rs, int index,
PropertyDescriptor pd) throws SQLException {
+ String propertyName = pd.getName();
+
+ // 1. Handle Timestamp to Long (nanoseconds) conversion
+ if ("timeUnixNano".equals(propertyName) ||
"observedTimeUnixNano".equals(propertyName)) {
+ Timestamp timestamp = rs.getTimestamp(index);
+ if (timestamp == null) {
+ return null;
+ }
+ long seconds = timestamp.getTime() / 1000;
+ long nanos = timestamp.getNanos();
+ return seconds * 1_000_000_000L + nanos;
+ }
+
+ // 2. Handle JSON String to Map conversion (attributes, resource)
+ if ("attributes".equals(propertyName) ||
"resource".equals(propertyName)) {
+ String json = rs.getString(index);
+ if (!StringUtils.hasText(json)) {
+ return null;
+ }
+ try {
+ return JsonUtil.fromJson(json, new
TypeReference<Map<String, Object>>() {});
+ } catch (Exception e) {
+ log.warn("Failed to parse JSON map for {}: {}",
propertyName, json);
+ return null;
+ }
+ }
+
+ // 3. Handle JSON String to InstrumentationScope conversion
+ if ("instrumentationScope".equals(propertyName)) {
+ String json = rs.getString(index);
+ if (!StringUtils.hasText(json)) {
+ return null;
+ }
+ try {
+ return JsonUtil.fromJson(json,
LogEntry.InstrumentationScope.class);
+ } catch (Exception e) {
+ log.warn("Failed to parse instrumentationScope: {}", json);
+ return null;
+ }
+ }
+
+ // 4. Default handling for other fields (traceId, spanId,
severityText, body, etc.)
+ return super.getColumnValue(rs, index, pd);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
index 89fe0f3adb..567e27177e 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
@@ -65,45 +65,67 @@ public interface HistoryDataReader {
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
+ * @param searchContent search content in log body
* @return filtered log entries
*/
default List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long
endTime, String traceId,
String spanId,
Integer severityNumber,
- String severityText) {
+ String severityText,
String searchContent) {
throw new UnsupportedOperationException("query logs by multiple
conditions is not supported");
}
/**
- * Query logs with multiple filter conditions and pagination
+ * Query logs with multiple filter conditions and pagination (Legacy)
+ */
+ default List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
+ String
spanId, Integer severityNumber,
+ String
severityText, Integer offset, Integer limit) {
+ return queryLogsByMultipleConditionsWithPagination(startTime, endTime,
traceId, spanId, severityNumber, severityText, null, offset, limit);
+ }
+
+ /**
+ * Query logs with multiple filter conditions and pagination including
search content
* @param startTime start time in milliseconds
* @param endTime end time in milliseconds
* @param traceId trace ID filter
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
+ * @param searchContent search content in log body
* @param offset pagination offset
* @param limit pagination limit
* @return filtered log entries with pagination
*/
default List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
String
spanId, Integer severityNumber,
- String
severityText, Integer offset, Integer limit) {
+ String
severityText, String searchContent,
+ Integer
offset, Integer limit) {
throw new UnsupportedOperationException("query logs by multiple
conditions with pagination is not supported");
}
/**
- * Count logs with multiple filter conditions
+ * Count logs with multiple filter conditions (Legacy)
+ */
+ default long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
+ String spanId, Integer
severityNumber,
+ String severityText) {
+ return countLogsByMultipleConditions(startTime, endTime, traceId,
spanId, severityNumber, severityText, null);
+ }
+
+ /**
+ * Count logs with multiple filter conditions including search content
* @param startTime start time in milliseconds
* @param endTime end time in milliseconds
* @param traceId trace ID filter
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
+ * @param searchContent search content in log body
* @return count of matching log entries
*/
default long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
String spanId, Integer
severityNumber,
- String severityText) {
+ String severityText, String
searchContent) {
throw new UnsupportedOperationException("count logs by multiple
conditions is not supported");
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index c2834f5ab9..c62e66a035 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -26,29 +26,6 @@ import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.ZonedDateTime;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAmount;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hertzbeat.common.constants.CommonConstants;
@@ -60,9 +37,9 @@ import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.Base64Util;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.common.util.TimePeriodUtil;
+import org.apache.hertzbeat.warehouse.db.GreptimeSqlQueryExecutor;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
import org.apache.hertzbeat.warehouse.store.history.tsdb.vm.PromQlQueryContent;
-import org.apache.hertzbeat.warehouse.db.GreptimeSqlQueryExecutor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@@ -76,8 +53,30 @@ import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
/**
- * GreptimeDB data storage, only supports GreptimeDB version >= v0.5
+ * GreptimeDB data storage
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
@@ -95,11 +94,8 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
private static final int LOG_BATCH_SIZE = 500;
private GreptimeDB greptimeDb;
-
private final GreptimeProperties greptimeProperties;
-
private final RestTemplate restTemplate;
-
private final GreptimeSqlQueryExecutor greptimeSqlQueryExecutor;
public GreptimeDbDataStorage(GreptimeProperties greptimeProperties,
RestTemplate restTemplate,
@@ -127,7 +123,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
log.error("[warehouse greptime] Fail to start GreptimeDB client");
return false;
}
-
return true;
}
@@ -166,19 +161,15 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
-
AtomicInteger index = new AtomicInteger(-1);
rowWrapper.cellStream().forEach(cell -> {
index.getAndIncrement();
-
if (CommonConstants.NULL_VALUE.equals(cell.getValue())) {
values[2 + index.get()] = null;
return;
}
-
Boolean label =
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
Byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
-
if (label) {
values[2 + index.get()] = cell.getValue();
} else {
@@ -189,10 +180,8 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
}
}
});
-
table.addRow(values);
}
-
CompletableFuture<Result<WriteOk, Err>> writeFuture =
greptimeDb.write(table);
try {
Result<WriteOk, Err> result = writeFuture.get(10,
TimeUnit.SECONDS);
@@ -212,7 +201,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
Map<String, Long> timeRange = getTimeRange(history);
Long start = timeRange.get(LABEL_KEY_START_TIME);
Long end = timeRange.get(LABEL_KEY_END_TIME);
-
String step = getTimeStep(start, end);
return getHistoryData(start, end, step, instance, app, metrics,
metric);
@@ -228,7 +216,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
Map<String, Long> timeRange = getTimeRange(history);
Long start = timeRange.get(LABEL_KEY_START_TIME);
Long end = timeRange.get(LABEL_KEY_END_TIME);
-
String step = getTimeStep(start, end);
Map<String, List<Value>> instanceValuesMap = getHistoryData(start,
end, step, instance, app, metrics, metric);
@@ -237,32 +224,25 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
// Therefore, we restrict the valid range by obtaining the post-query
timeframe.
// Since `gretime`'s `end` excludes the specified time, we add 4 hours.
List<Value> values =
instanceValuesMap.get(instanceValuesMap.keySet().stream().toList().get(0));
- // effective time
long effectiveStart = values.get(0).getTime() / 1000;
long effectiveEnd = values.get(values.size() - 1).getTime() / 1000 +
Duration.ofHours(4).getSeconds();
-
String name = getTableName(metrics);
String timeSeriesSelector = name + "{" + LABEL_KEY_INSTANCE + "=\"" +
instance + "\"";
if (!CommonConstants.PROMETHEUS.equals(app)) {
timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD +
"=\"" + metric + "\"";
}
timeSeriesSelector = timeSeriesSelector + "}";
-
try {
- // max
String finalTimeSeriesSelector = timeSeriesSelector;
URI uri = getUri(effectiveStart, effectiveEnd, step, uriComponents
-> "max_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMax);
- // min
uri = getUri(effectiveStart, effectiveEnd, step, uriComponents ->
"min_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMin);
- // avg
uri = getUri(effectiveStart, effectiveEnd, step, uriComponents ->
"avg_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMean);
} catch (Exception e) {
log.error("query interval metrics data from greptime error. {}",
e.getMessage(), e);
}
-
return instanceValuesMap;
}
@@ -302,7 +282,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
* @return step
*/
private String getTimeStep(long start, long end) {
- // get step
String step = "60s";
if (end - start < Duration.ofDays(7).getSeconds() && end - start >
Duration.ofDays(1).getSeconds()) {
step = "1h";
@@ -331,11 +310,9 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
if (!CommonConstants.PROMETHEUS.equals(app)) {
timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD +
"=\"" + metric + "\"";
}
-
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
HttpEntity<Void> httpEntity = getHttpEntity();
-
String finalTimeSeriesSelector = timeSeriesSelector;
URI uri = getUri(start, end, step, uriComponents -> {
MultiValueMap<String, String> queryParams =
uriComponents.getQueryParams();
@@ -344,16 +321,13 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
}
return null;
});
-
ResponseEntity<PromQlQueryContent> responseEntity = null;
if (uri != null) {
- responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
}
if (responseEntity != null &&
responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from greptime success. {}", uri);
- if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null &&
responseEntity.getBody().getData().getResult() != null) {
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -388,8 +362,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
+ if (StringUtils.hasText(greptimeProperties.username()) &&
StringUtils.hasText(greptimeProperties.password())) {
String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
@@ -437,8 +410,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
return;
}
HttpEntity<Void> httpEntity = getHttpEntity();
- ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri, HttpMethod.GET, httpEntity,
PromQlQueryContent.class);
if (!responseEntity.getStatusCode().is2xxSuccessful()) {
log.error("query interval metrics data from greptime failed. {}",
responseEntity);
return;
@@ -475,6 +447,9 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
this.greptimeDb.shutdownGracefully();
this.greptimeDb = null;
}
+ if (this.greptimeSqlQueryExecutor != null) {
+ this.greptimeSqlQueryExecutor.close();
+ }
}
@Override
@@ -482,15 +457,13 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
if (!isServerAvailable()) {
return;
}
-
try {
- // Create table schema
TableSchema.Builder tableSchemaBuilder =
TableSchema.newBuilder(LOG_TABLE_NAME);
tableSchemaBuilder.addTimestamp("time_unix_nano",
DataType.TimestampNanosecond)
.addField("observed_time_unix_nano",
DataType.TimestampNanosecond)
.addField("severity_number", DataType.Int32)
.addField("severity_text", DataType.String)
- .addField("body", DataType.Json)
+ .addField("body", DataType.String)
.addField("trace_id", DataType.String)
.addField("span_id", DataType.String)
.addField("trace_flags", DataType.Int32)
@@ -498,16 +471,13 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
.addField("resource", DataType.Json)
.addField("instrumentation_scope", DataType.Json)
.addField("dropped_attributes_count", DataType.Int32);
-
Table table = Table.from(tableSchemaBuilder.build());
-
- // Convert LogEntry to table row
- Object[] values = new Object[] {
+ Object[] values = new Object[]{
logEntry.getTimeUnixNano() != null ?
logEntry.getTimeUnixNano() : System.nanoTime(),
logEntry.getObservedTimeUnixNano() != null ?
logEntry.getObservedTimeUnixNano() : System.nanoTime(),
logEntry.getSeverityNumber(),
logEntry.getSeverityText(),
- JsonUtil.toJson(logEntry.getBody()),
+ logEntry.getBody(),
logEntry.getTraceId(),
logEntry.getSpanId(),
logEntry.getTraceFlags(),
@@ -516,13 +486,9 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
JsonUtil.toJson(logEntry.getInstrumentationScope()),
logEntry.getDroppedAttributesCount()
};
-
table.addRow(values);
-
- // Write to GreptimeDB
CompletableFuture<Result<WriteOk, Err>> writeFuture =
greptimeDb.write(table);
Result<WriteOk, Err> result = writeFuture.get(10,
TimeUnit.SECONDS);
-
if (result.isOk()) {
log.debug("[warehouse greptime-log] Write successful");
} else {
@@ -536,14 +502,16 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long
endTime, String traceId,
String spanId, Integer
severityNumber,
- String severityText) {
+ String severityText,
String searchContent) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM
").append(LOG_TABLE_NAME);
- buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
+ List<Object> args = new ArrayList<>();
+ buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
sql.append(" ORDER BY time_unix_nano DESC");
- List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
- return mapRowsToLogEntries(rows);
+ // Execute via JDBC executor using parameters
+ List<LogEntry> rows =
greptimeSqlQueryExecutor.query(sql.toString(), args.toArray());
+ return rows;
} catch (Exception e) {
log.error("[warehouse greptime-log] queryLogsByMultipleConditions
error: {}", e.getMessage(), e);
return List.of();
@@ -553,22 +521,24 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
String
spanId, Integer severityNumber,
- String
severityText, Integer offset, Integer limit) {
+ String
severityText, String searchContent,
+ Integer
offset, Integer limit) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM
").append(LOG_TABLE_NAME);
- buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
+ List<Object> args = new ArrayList<>();
+ buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
sql.append(" ORDER BY time_unix_nano DESC");
- // Add pagination
if (limit != null && limit > 0) {
- sql.append(" LIMIT ").append(limit);
+ sql.append(" LIMIT ?");
+ args.add(limit);
if (offset != null && offset > 0) {
- sql.append(" OFFSET ").append(offset);
+ sql.append(" OFFSET ?");
+ args.add(offset);
}
}
- List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
- return mapRowsToLogEntries(rows);
+ return greptimeSqlQueryExecutor.query(sql.toString(),
args.toArray());
} catch (Exception e) {
log.error("[warehouse greptime-log]
queryLogsByMultipleConditionsWithPagination error: {}", e.getMessage(), e);
return List.of();
@@ -577,20 +547,14 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
- String spanId, Integer
severityNumber,
- String severityText) {
+ String spanId, Integer
severityNumber,
+ String severityText, String
searchContent) {
try {
StringBuilder sql = new StringBuilder("SELECT COUNT(*) as count
FROM ").append(LOG_TABLE_NAME);
- buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
+ List<Object> args = new ArrayList<>();
+ buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
- List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
- if (rows != null && !rows.isEmpty()) {
- Object countObj = rows.get(0).get("count");
- if (countObj instanceof Number) {
- return ((Number) countObj).longValue();
- }
- }
- return 0;
+ return greptimeSqlQueryExecutor.count(sql.toString(),
args.toArray());
} catch (Exception e) {
log.error("[warehouse greptime-log] countLogsByMultipleConditions
error: {}", e.getMessage(), e);
return 0;
@@ -601,12 +565,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
return ms * 1_000_000L;
}
- private static String safeString(String input) {
- if (input == null) {
- return "";
- }
- return input.replace("'", "''");
- }
/**
* build WHERE conditions
@@ -617,151 +575,57 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
* @param spanId span id
* @param severityNumber severity number
*/
- private void buildWhereConditions(StringBuilder sql, Long startTime, Long
endTime, String traceId,
- String spanId, Integer severityNumber,
String severityText) {
+ private void buildWhereConditions(StringBuilder sql, List<Object> args,
Long startTime, Long endTime,
+ String traceId, String spanId, Integer
severityNumber,
+ String severityText, String
searchContent) {
List<String> conditions = new ArrayList<>();
-
- // Time range condition
if (startTime != null && endTime != null) {
- conditions.add("time_unix_nano >= " + msToNs(startTime) + " AND
time_unix_nano <= " + msToNs(endTime));
+ conditions.add("time_unix_nano >= ? AND time_unix_nano <= ?");
+ args.add(msToNs(startTime));
+ args.add(msToNs(endTime));
}
-
- // TraceId condition
if (StringUtils.hasText(traceId)) {
- conditions.add("trace_id = '" + safeString(traceId) + "'");
+ conditions.add("trace_id = ?");
+ args.add(traceId);
}
-
- // SpanId condition
if (StringUtils.hasText(spanId)) {
- conditions.add("span_id = '" + safeString(spanId) + "'");
+ conditions.add("span_id = ?");
+ args.add(spanId);
}
-
- // Severity condition
if (severityNumber != null) {
- conditions.add("severity_number = " + severityNumber);
+ conditions.add("severity_number = ?");
+ args.add(severityNumber);
}
-
- // SeverityText condition
if (StringUtils.hasText(severityText)) {
- conditions.add("severity_text = '" + safeString(severityText) +
"'");
+ conditions.add("severity_text = ?");
+ args.add(severityText);
+ }
+ if (StringUtils.hasText(searchContent)) {
+ // Using CAST(body AS String) to search within JSON/String content.
+ // GreptimeDB supports PostgreSQL protocol, this syntax is
generally safe.
+ conditions.add("CAST(body AS String) LIKE ?");
+ args.add("%" + searchContent + "%");
}
-
- // Add WHERE clause if there are conditions
if (!conditions.isEmpty()) {
sql.append(" WHERE ").append(String.join(" AND ", conditions));
}
}
- private List<LogEntry> mapRowsToLogEntries(List<Map<String, Object>> rows)
{
- List<LogEntry> list = new LinkedList<>();
- if (rows == null || rows.isEmpty()) {
- return list;
- }
- for (Map<String, Object> row : rows) {
- try {
- LogEntry.InstrumentationScope scope = null;
- Object scopeObj = row.get("instrumentation_scope");
- if (scopeObj instanceof String scopeStr &&
StringUtils.hasText(scopeStr)) {
- try {
- scope = JsonUtil.fromJson(scopeStr,
LogEntry.InstrumentationScope.class);
- } catch (Exception ignore) {
- scope = null;
- }
- }
-
- Object bodyObj = parseJsonMaybe(row.get("body"));
- Map<String, Object> attributes =
castToMap(parseJsonMaybe(row.get("attributes")));
- Map<String, Object> resource =
castToMap(parseJsonMaybe(row.get("resource")));
-
- LogEntry entry = LogEntry.builder()
- .timeUnixNano(castToLong(row.get("time_unix_nano")))
-
.observedTimeUnixNano(castToLong(row.get("observed_time_unix_nano")))
-
.severityNumber(castToInteger(row.get("severity_number")))
- .severityText(castToString(row.get("severity_text")))
- .body(bodyObj)
- .traceId(castToString(row.get("trace_id")))
- .spanId(castToString(row.get("span_id")))
- .traceFlags(castToInteger(row.get("trace_flags")))
- .attributes(attributes)
- .resource(resource)
- .instrumentationScope(scope)
-
.droppedAttributesCount(castToInteger(row.get("dropped_attributes_count")))
- .build();
- list.add(entry);
- } catch (Exception e) {
- log.warn("[warehouse greptime-log] map row to LogEntry error:
{}", e.getMessage());
- }
- }
- return list;
- }
-
- private static Object parseJsonMaybe(Object value) {
- if (value == null) return null;
- if (value instanceof Map) return value;
- if (value instanceof String str) {
- String s = str.trim();
- if ((s.startsWith("{") && s.endsWith("}")) || (s.startsWith("[")
&& s.endsWith("]"))) {
- try {
- return JsonUtil.fromJson(s, Object.class);
- } catch (Exception e) {
- return s;
- }
- }
- return s;
- }
- return value;
- }
-
- @SuppressWarnings("unchecked")
- private static Map<String, Object> castToMap(Object obj) {
- if (obj instanceof Map) {
- return (Map<String, Object>) obj;
- }
- return null;
- }
-
- private static Long castToLong(Object obj) {
- if (obj == null) return null;
- if (obj instanceof Number n) return n.longValue();
- try {
- return Long.parseLong(String.valueOf(obj));
- } catch (Exception e) {
- return null;
- }
- }
-
- private static Integer castToInteger(Object obj) {
- if (obj == null) return null;
- if (obj instanceof Number n) return n.intValue();
- try {
- return Integer.parseInt(String.valueOf(obj));
- } catch (Exception e) {
- return null;
- }
- }
-
- private static String castToString(Object obj) {
- return obj == null ? null : String.valueOf(obj);
- }
-
@Override
public boolean batchDeleteLogs(List<Long> timeUnixNanos) {
if (!isServerAvailable() || timeUnixNanos == null ||
timeUnixNanos.isEmpty()) {
return false;
}
-
try {
StringBuilder sql = new StringBuilder("DELETE FROM
").append(LOG_TABLE_NAME).append(" WHERE time_unix_nano IN (");
- sql.append(timeUnixNanos.stream()
- .filter(time -> time != null)
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- sql.append(")");
+ // Construct placeholders (?,?,?)
+ String placeholders = timeUnixNanos.stream().map(t ->
"?").collect(Collectors.joining(", "));
+ sql.append(placeholders).append(")");
- greptimeSqlQueryExecutor.execute(sql.toString());
+ // Convert list to array for varargs
+ greptimeSqlQueryExecutor.query(sql.toString(),
timeUnixNanos.toArray());
log.info("[warehouse greptime-log] Batch delete executed
successfully for {} logs", timeUnixNanos.size());
return true;
-
} catch (Exception e) {
log.error("[warehouse greptime-log] batchDeleteLogs error: {}",
e.getMessage(), e);
return false;
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
index 7913eec5fd..f356eea5c6 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
@@ -27,11 +27,16 @@ import
org.springframework.boot.context.properties.bind.DefaultValue;
* GrepTimeDB configuration information
*/
@ConfigurationProperties(prefix =
ConfigConstants.FunctionModuleConstants.WAREHOUSE
- + SignConstants.DOT
- + WarehouseConstants.STORE
- + SignConstants.DOT
- + WarehouseConstants.HistoryName.GREPTIME)
-public record GreptimeProperties(@DefaultValue("false") boolean enabled,
- @DefaultValue("127.0.0.1:4001") String grpcEndpoints,
@DefaultValue("http://127.0.0.1:4000") String httpEndpoint,
- @DefaultValue("public") String database, String username, String
password) {
-}
+ + SignConstants.DOT
+ + WarehouseConstants.STORE
+ + SignConstants.DOT
+ + WarehouseConstants.HistoryName.GREPTIME)
+public record GreptimeProperties(
+ @DefaultValue("false") boolean enabled,
+ @DefaultValue("127.0.0.1:4001") String grpcEndpoints,
+ @DefaultValue("http://127.0.0.1:4000") String httpEndpoint,
+ @DefaultValue("127.0.0.1:4003") String postgresEndpoint,
+ @DefaultValue("public") String database,
+ String username,
+ String password) {
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
index 99f83855ae..9a518c8a54 100644
---
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,26 +21,23 @@ package org.apache.hertzbeat.warehouse.db;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
+import org.apache.hertzbeat.common.entity.log.LogEntry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.client.RestTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
/**
* Test case for {@link GreptimeSqlQueryExecutor}
@@ -49,36 +46,23 @@ import org.springframework.web.client.RestTemplate;
class GreptimeSqlQueryExecutorTest {
@Mock
- private GreptimeProperties greptimeProperties;
-
- @Mock
- private RestTemplate restTemplate;
+ private JdbcTemplate jdbcTemplate;
private GreptimeSqlQueryExecutor greptimeSqlQueryExecutor;
@BeforeEach
void setUp() {
-
when(greptimeProperties.httpEndpoint()).thenReturn("http://127.0.0.1:4000");
- when(greptimeProperties.database()).thenReturn("hertzbeat");
- when(greptimeProperties.username()).thenReturn("username");
- when(greptimeProperties.password()).thenReturn("password");
-
- greptimeSqlQueryExecutor = new
GreptimeSqlQueryExecutor(greptimeProperties, restTemplate);
+ // Use the constructor capable of dependency injection for mocking
+ greptimeSqlQueryExecutor = new GreptimeSqlQueryExecutor(jdbcTemplate);
}
@Test
void testExecuteSuccess() {
- // Mock successful response
- GreptimeSqlQueryContent mockResponse = createMockResponse();
- ResponseEntity<GreptimeSqlQueryContent> responseEntity =
- new ResponseEntity<>(mockResponse, HttpStatus.OK);
-
- when(restTemplate.exchange(
- any(String.class),
- eq(HttpMethod.POST),
- any(HttpEntity.class),
- eq(GreptimeSqlQueryContent.class)
- )).thenReturn(responseEntity);
+ // Mock successful response for queryForList
+ List<Map<String, Object>> mockResult = new ArrayList<>();
+ mockResult.add(Map.of("metric_name", "cpu", "value", 85.5));
+
+
when(jdbcTemplate.queryForList(any(String.class))).thenReturn(mockResult);
// Execute
List<Map<String, Object>> result =
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics");
@@ -93,48 +77,41 @@ class GreptimeSqlQueryExecutorTest {
@Test
void testExecuteError() {
// Mock error response
- when(restTemplate.exchange(
- any(String.class),
- eq(HttpMethod.POST),
- any(HttpEntity.class),
- eq(GreptimeSqlQueryContent.class)
- )).thenThrow(new RuntimeException("Connection error"));
-
- // Execute
- List<Map<String, Object>> result =
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics");
+ when(jdbcTemplate.queryForList(any(String.class))).thenThrow(new
RuntimeException("Connection error"));
- // Verify returns empty list on error
- assertNotNull(result);
- assertTrue(result.isEmpty());
+ // Execute and verify exception
+ assertThrows(RuntimeException.class, () ->
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics"));
}
- private GreptimeSqlQueryContent createMockResponse() {
- GreptimeSqlQueryContent response = new GreptimeSqlQueryContent();
- response.setCode(0);
+ @Test
+ void testQuerySuccess() {
+ // Mock success response for query (using RowMapper)
+ List<LogEntry> mockLogs =
List.of(LogEntry.builder().traceId("123").build());
+ when(jdbcTemplate.query(any(String.class), any(RowMapper.class),
any(Object[].class)))
+ .thenReturn(mockLogs);
- // Create simple schema
- List<GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema>
columnSchemas = new ArrayList<>();
- columnSchemas.add(new
GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema("metric_name",
"String"));
- columnSchemas.add(new
GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema("value", "Float64"));
+ // Execute
+ List<LogEntry> result = greptimeSqlQueryExecutor.query("SELECT * FROM
logs WHERE id = ?", 1);
- GreptimeSqlQueryContent.Output.Records.Schema schema =
- new GreptimeSqlQueryContent.Output.Records.Schema();
- schema.setColumnSchemas(columnSchemas);
+ // Verify
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertEquals("123", result.get(0).getTraceId());
- // Create simple row
- List<List<Object>> rows = new ArrayList<>();
- rows.add(List.of("cpu", 85.5));
+ // Verify args passed
+ verify(jdbcTemplate).query(eq("SELECT * FROM logs WHERE id = ?"),
any(RowMapper.class), eq(1));
+ }
- // Build response structure
- GreptimeSqlQueryContent.Output.Records records =
- new GreptimeSqlQueryContent.Output.Records();
- records.setSchema(schema);
- records.setRows(rows);
+ @Test
+ void testCountSuccess() {
+ // Mock success response for queryForObject (count)
+ when(jdbcTemplate.queryForObject(any(String.class), eq(Long.class),
any(Object[].class)))
+ .thenReturn(10L);
- GreptimeSqlQueryContent.Output output = new
GreptimeSqlQueryContent.Output();
- output.setRecords(records);
+ // Execute
+ Long count = greptimeSqlQueryExecutor.count("SELECT COUNT(*) FROM
logs");
- response.setOutput(List.of(output));
- return response;
+ // Verify
+ assertEquals(10L, count);
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
index 45a1234c11..4b4a825608 100644
---
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
@@ -82,7 +82,7 @@ class GreptimeDbDataStorageTest {
@Mock
private GreptimeDB greptimeDb;
-
+
private GreptimeDbDataStorage greptimeDbDataStorage;
@BeforeEach
@@ -120,14 +120,14 @@ class GreptimeDbDataStorageTest {
void testSaveData() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
-
+
// Mock the write result
@SuppressWarnings("unchecked")
Result<WriteOk, Err> mockResult = mock(Result.class);
when(mockResult.isOk()).thenReturn(true);
CompletableFuture<Result<WriteOk, Err>> mockFuture =
CompletableFuture.completedFuture(mockResult);
when(greptimeDb.write(any(Table.class))).thenReturn(mockFuture);
-
+
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
// Test with valid metrics data
@@ -153,7 +153,7 @@ class GreptimeDbDataStorageTest {
@Test
void testGetHistoryMetricData() {
greptimeDbDataStorage = new GreptimeDbDataStorage(greptimeProperties,
restTemplate, greptimeSqlQueryExecutor);
-
+
PromQlQueryContent content = createMockPromQlQueryContent();
ResponseEntity<PromQlQueryContent> responseEntity = new
ResponseEntity<>(content, HttpStatus.OK);
@@ -172,14 +172,14 @@ class GreptimeDbDataStorageTest {
void testSaveLogData() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
-
+
// Mock the write result
@SuppressWarnings("unchecked")
Result<WriteOk, Err> mockResult = mock(Result.class);
when(mockResult.isOk()).thenReturn(true);
CompletableFuture<Result<WriteOk, Err>> mockFuture =
CompletableFuture.completedFuture(mockResult);
when(greptimeDb.write(any(Table.class))).thenReturn(mockFuture);
-
+
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
LogEntry logEntry = createMockLogEntry();
@@ -194,51 +194,61 @@ class GreptimeDbDataStorageTest {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
- List<Map<String, Object>> mockLogRows = createMockLogRows();
-
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(mockLogRows);
-
+
+ // Mock list return for query
+ List<LogEntry> mockLogs = List.of(createMockLogEntry());
+
+ // The query passes 7 arguments: start, end, traceId, spanId,
severityNumber, severityText, content
+ when(greptimeSqlQueryExecutor.query(anyString(), any(), any(),
any(), any(), any(), any(), any()))
+ .thenReturn(mockLogs);
+
// Test basic query
List<LogEntry> result =
greptimeDbDataStorage.queryLogsByMultipleConditions(
- System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), "trace123", "span456", 1, "INFO"
+ System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), "trace123", "span456", 1, "INFO", "content"
);
assertNotNull(result);
assertEquals(1, result.size());
assertEquals("trace123", result.get(0).getTraceId());
- // Test count query
- List<Map<String, Object>> mockCountResult =
List.of(Map.of("count", 5L));
-
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(mockCountResult);
+ // Mock count return
+ // The count query passes 2 arguments: start, end (other fields
are null)
+ when(greptimeSqlQueryExecutor.count(anyString(), any(),
any())).thenReturn(5L);
long count = greptimeDbDataStorage.countLogsByMultipleConditions(
- System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), null, null, null, null
+ System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), null, null, null, null, null
);
assertEquals(5L, count);
-
+
// Test count query with executor error
- when(greptimeSqlQueryExecutor.execute(anyString())).thenThrow(new
RuntimeException("Database error"));
- long errorCount =
greptimeDbDataStorage.countLogsByMultipleConditions(System.currentTimeMillis()
- 3600000, System.currentTimeMillis(), null, null, null, null);
+ when(greptimeSqlQueryExecutor.count(anyString(), any(),
any())).thenThrow(new RuntimeException("Database error"));
+ long errorCount =
greptimeDbDataStorage.countLogsByMultipleConditions(System.currentTimeMillis()
- 3600000, System.currentTimeMillis(), null, null, null, null, null);
assertEquals(0L, errorCount);
}
}
-
+
@Test
void testQueryLogsWithPagination() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
-
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(createMockLogRows());
-
+
+ List<LogEntry> mockLogs = List.of(createMockLogEntry());
+ // The query passes 4 arguments: start, end, limit, offset
+ when(greptimeSqlQueryExecutor.query(anyString(), any(), any(),
any(), any())).thenReturn(mockLogs);
+
ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
greptimeDbDataStorage.queryLogsByMultipleConditionsWithPagination(
System.currentTimeMillis() - 3600000,
System.currentTimeMillis(),
- null, null, null, null, 1, 10
+ null, null, null, null, null, 1, 10
);
- verify(greptimeSqlQueryExecutor).execute(sqlCaptor.capture());
+ // Verify that the query method was called with the SQL containing
LIMIT and OFFSET
+ // And 4 parameters
+ verify(greptimeSqlQueryExecutor).query(sqlCaptor.capture(), any(),
any(), any(), any());
String capturedSql = sqlCaptor.getValue();
-
- assertTrue(capturedSql.toLowerCase().contains("limit 10"));
- assertTrue(capturedSql.toLowerCase().contains("offset 1"));
+
+ assertTrue(capturedSql.toLowerCase().contains("limit ?"));
+ assertTrue(capturedSql.toLowerCase().contains("offset ?"));
}
}
@@ -252,7 +262,8 @@ class GreptimeDbDataStorageTest {
// Test with valid list
boolean result = greptimeDbDataStorage.batchDeleteLogs(List.of(1L,
2L));
assertTrue(result);
- verify(greptimeSqlQueryExecutor, times(1)).execute(anyString());
+ // Verify query method is called with 2 arguments (for 2 IDs)
+ verify(greptimeSqlQueryExecutor, times(1)).query(anyString(),
eq(1L), eq(2L));
}
}
@@ -265,7 +276,8 @@ class GreptimeDbDataStorageTest {
// Test with empty list
boolean emptyResult =
greptimeDbDataStorage.batchDeleteLogs(Collections.emptyList());
assertFalse(emptyResult);
- verify(greptimeSqlQueryExecutor, never()).execute(anyString());
+ // Verify query was never called
+ verify(greptimeSqlQueryExecutor, never()).query(anyString(),
any());
}
}
@@ -278,7 +290,8 @@ class GreptimeDbDataStorageTest {
// Test with null list
boolean nullResult = greptimeDbDataStorage.batchDeleteLogs(null);
assertFalse(nullResult);
- verify(greptimeSqlQueryExecutor, never()).execute(anyString());
+ // Verify query was never called
+ verify(greptimeSqlQueryExecutor, never()).query(anyString(),
any());
}
}
@@ -299,7 +312,7 @@ class GreptimeDbDataStorageTest {
lenient().when(mockMetricsData.getCode()).thenReturn(CollectRep.Code.SUCCESS);
lenient().when(mockMetricsData.getMetrics()).thenReturn("cpu");
lenient().when(mockMetricsData.getId()).thenReturn(1L);
-
+
if (!hasValues) {
lenient().when(mockMetricsData.getValues()).thenReturn(Collections.emptyList());
return mockMetricsData;
@@ -311,36 +324,36 @@ class GreptimeDbDataStorageTest {
lenient().when(mockField1.getName()).thenReturn("usage");
lenient().when(mockField1.getLabel()).thenReturn(false);
lenient().when(mockField1.getType()).thenReturn((int)
CommonConstants.TYPE_NUMBER);
-
+
CollectRep.Field mockField2 = mock(CollectRep.Field.class);
lenient().when(mockField2.getName()).thenReturn("instance");
lenient().when(mockField2.getLabel()).thenReturn(true);
lenient().when(mockField2.getType()).thenReturn((int)
CommonConstants.TYPE_STRING);
-
+
lenient().when(mockMetricsData.getFields()).thenReturn(List.of(mockField1,
mockField2));
-
+
// Create ValueRow mock
CollectRep.ValueRow mockValueRow = mock(CollectRep.ValueRow.class);
lenient().when(mockValueRow.getColumnsList()).thenReturn(List.of("server1",
"85.5"));
-
+
lenient().when(mockMetricsData.getValues()).thenReturn(List.of(mockValueRow));
// Mock RowWrapper for readRow()
RowWrapper mockRowWrapper = mock(RowWrapper.class);
lenient().when(mockRowWrapper.hasNextRow()).thenReturn(true, false);
lenient().when(mockRowWrapper.nextRow()).thenReturn(mockRowWrapper);
-
+
// Mock cell stream
ArrowCell mockCell1 = mock(ArrowCell.class);
lenient().when(mockCell1.getValue()).thenReturn("85.5");
lenient().when(mockCell1.getMetadataAsBoolean(any())).thenReturn(false);
lenient().when(mockCell1.getMetadataAsByte(any())).thenReturn(CommonConstants.TYPE_NUMBER);
-
+
ArrowCell mockCell2 = mock(ArrowCell.class);
lenient().when(mockCell2.getValue()).thenReturn("server1");
lenient().when(mockCell2.getMetadataAsBoolean(any())).thenReturn(true);
lenient().when(mockCell2.getMetadataAsByte(any())).thenReturn(CommonConstants.TYPE_STRING);
-
+
lenient().when(mockRowWrapper.cellStream()).thenReturn(java.util.stream.Stream.of(mockCell1,
mockCell2));
lenient().when(mockMetricsData.readRow()).thenReturn(mockRowWrapper);
@@ -376,15 +389,4 @@ class GreptimeDbDataStorageTest {
.spanId("span456")
.build();
}
-
- private List<Map<String, Object>> createMockLogRows() {
- Map<String, Object> row = new HashMap<>();
- row.put("time_unix_nano", System.nanoTime());
- row.put("severity_text", "INFO");
- row.put("body", "\"Test log message\"");
- row.put("trace_id", "trace123");
- row.put("span_id", "span456");
-
- return List.of(row);
- }
}
\ No newline at end of file
diff --git a/script/application.yml b/script/application.yml
index e141b4606c..8d36940e2c 100644
--- a/script/application.yml
+++ b/script/application.yml
@@ -209,6 +209,7 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
+ postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git
a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
index 1bfe96d105..ea8347f19b 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
@@ -149,6 +149,7 @@ warehouse:
enabled: true
grpc-endpoints: greptime:4001
http-endpoint: http://greptime:4000
+ postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git a/web-app/src/app/routes/log/log-manage/log-manage.component.html
b/web-app/src/app/routes/log/log-manage/log-manage.component.html
index cb95696ac9..3ee454cd9d 100644
--- a/web-app/src/app/routes/log/log-manage/log-manage.component.html
+++ b/web-app/src/app/routes/log/log-manage/log-manage.component.html
@@ -5,9 +5,9 @@
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,9 +23,7 @@
<nz-divider></nz-divider>
-<!-- Combined Filter and Statistics Card -->
<nz-card [nzTitle]="'log.manage.title' | i18n" class="manager-card">
- <!-- Filters Section -->
<div class="filters-container">
<nz-space nzSize="middle" nzWrap>
<nz-range-picker *nzSpaceItem [(ngModel)]="timeRange" nzShowTime
nzFormat="yyyy-MM-dd HH:mm:ss"></nz-range-picker>
@@ -56,6 +54,7 @@
<nz-auto-option nzValue="ERROR">ERROR</nz-auto-option>
<nz-auto-option nzValue="FATAL">FATAL</nz-auto-option>
</nz-autocomplete>
+ <input *nzSpaceItem nz-input [placeholder]="'log.manage.search' | i18n"
[(ngModel)]="searchContent" style="width: 200px" />
<button *nzSpaceItem nz-button nzType="primary" (click)="query()">
<i nz-icon nzType="search"></i> {{ 'log.manage.search' | i18n }}
</button>
@@ -101,9 +100,7 @@
</ng-template>
</div>
- <!-- Statistics Section (Collapsible) -->
<div *ngIf="showStatistics">
- <!-- Statistics Overview -->
<div nz-row [nzGutter]="16" style="margin-bottom: 16px">
<div nz-col nzXs="12" nzSm="4">
<nz-card>
@@ -179,7 +176,6 @@
</div>
</div>
- <!-- Charts Section -->
<div nz-row [nzGutter]="16">
<div nz-col nzXs="24" nzMd="8">
<nz-card [nzTitle]="'log.manage.chart.severity-distribution' | i18n"
[nzSize]="'small'">
@@ -200,7 +196,6 @@
</div>
</nz-card>
-<!-- Log Table -->
<nz-table
#fixedTable
[nzData]="data"
@@ -319,7 +314,6 @@
</tbody>
</nz-table>
-<!-- Log Details Modal -->
<nz-modal
[(nzVisible)]="isModalVisible"
[nzTitle]="'log.manage.log-entry-details' | i18n"
@@ -329,7 +323,6 @@
>
<ng-container *nzModalContent>
<div *ngIf="selectedLogEntry" class="log-details-modal">
- <!-- Basic Info Section -->
<nz-card [nzTitle]="'log.manage.basic-information' | i18n"
[nzSize]="'small'" class="basic-info-card">
<div class="basic-info">
<div class="info-row">
@@ -353,7 +346,6 @@
</div>
</nz-card>
- <!-- JSON Details Section -->
<nz-card [nzTitle]="'log.manage.complete-json-data' | i18n"
[nzSize]="'small'">
<div class="json-content">
<button
diff --git a/web-app/src/app/routes/log/log-manage/log-manage.component.ts
b/web-app/src/app/routes/log/log-manage/log-manage.component.ts
index 1d448931d1..496161769b 100644
--- a/web-app/src/app/routes/log/log-manage/log-manage.component.ts
+++ b/web-app/src/app/routes/log/log-manage/log-manage.component.ts
@@ -92,6 +92,7 @@ export class LogManageComponent implements OnInit {
severityText?: string;
traceId: string = '';
spanId: string = '';
+ searchContent: string = '';
// table with pagination
loading = false;
@@ -302,6 +303,7 @@ export class LogManageComponent implements OnInit {
this.spanId,
this.severityNumber,
this.severityText,
+ this.searchContent,
this.pageIndex - 1,
this.pageSize
);
@@ -339,8 +341,9 @@ export class LogManageComponent implements OnInit {
const spanId = this.spanId || undefined;
const severity = this.severityNumber || undefined;
const severityText = this.severityText || undefined;
+ const search = this.searchContent || undefined;
- this.logSvc.overviewStats(start, end, traceId, spanId, severity,
severityText).subscribe({
+ this.logSvc.overviewStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
next: message => {
if (message.code === 0) {
this.overviewStats = message.data || {};
@@ -349,7 +352,7 @@ export class LogManageComponent implements OnInit {
}
});
- this.logSvc.traceCoverageStats(start, end, traceId, spanId, severity,
severityText).subscribe({
+ this.logSvc.traceCoverageStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
next: message => {
if (message.code === 0) {
this.refreshTraceCoverageChart(message.data || {});
@@ -357,7 +360,7 @@ export class LogManageComponent implements OnInit {
}
});
- this.logSvc.trendStats(start, end, traceId, spanId, severity,
severityText).subscribe({
+ this.logSvc.trendStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
next: message => {
if (message.code === 0) {
this.refreshTrendChart(message.data?.hourlyStats || {});
@@ -372,6 +375,7 @@ export class LogManageComponent implements OnInit {
this.traceId = '';
this.spanId = '';
this.severityText = '';
+ this.searchContent = '';
this.pageIndex = 1;
this.query();
}
diff --git a/web-app/src/app/service/log.service.ts
b/web-app/src/app/service/log.service.ts
index 2a4a0a19aa..0f5b70ea7d 100644
--- a/web-app/src/app/service/log.service.ts
+++ b/web-app/src/app/service/log.service.ts
@@ -43,6 +43,7 @@ export class LogService {
spanId?: string,
severityNumber?: number,
severityText?: string,
+ search?: string,
pageIndex: number = 0,
pageSize: number = 20
): Observable<Message<Page<LogEntry>>> {
@@ -53,6 +54,7 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
+ if (search) params = params.set('search', search);
params = params.set('pageIndex', pageIndex);
params = params.set('pageSize', pageSize);
return this.http.get<Message<any>>(logs_list_uri, { params });
@@ -64,7 +66,8 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string
+ severityText?: string,
+ search?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -73,6 +76,7 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
+ if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_overview_uri, { params });
}
@@ -82,7 +86,8 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string
+ severityText?: string,
+ search?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -91,6 +96,7 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
+ if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_trend_uri, { params });
}
@@ -100,7 +106,8 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string
+ severityText?: string,
+ search?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -109,6 +116,7 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
+ if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_trace_coverage_uri, { params
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]