This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 8e8ab80aa0 Revert "[refractor] Using Spring jdbc to query Greptime
log" (#3885)
8e8ab80aa0 is described below
commit 8e8ab80aa0804877c1ca9074993400f59efbd2ec
Author: Yang Chen <[email protected]>
AuthorDate: Wed Dec 3 12:32:45 2025 +0800
Revert "[refractor] Using Spring jdbc to query Greptime log" (#3885)
---
.../log/alert/LogPeriodicAlertE2eTest.java | 15 +-
.../log/storage/GreptimeLogStorageE2eTest.java | 23 +-
.../log/controller/LogQueryController.java | 161 ++++++-----
.../log/controller/LogQueryControllerTest.java | 111 ++++----
.../src/main/resources/application-test.yml | 1 -
.../src/main/resources/application.yml | 1 -
hertzbeat-warehouse/pom.xml | 25 +-
.../warehouse/db/GreptimeSqlQueryExecutor.java | 234 +++++-----------
.../store/history/tsdb/HistoryDataReader.java | 34 +--
.../tsdb/greptime/GreptimeDbDataStorage.java | 294 +++++++++++++++------
.../history/tsdb/greptime/GreptimeProperties.java | 21 +-
.../warehouse/db/GreptimeSqlQueryExecutorTest.java | 107 +++++---
.../tsdb/greptime/GreptimeDbDataStorageTest.java | 96 ++++---
script/application.yml | 1 -
.../conf/application.yml | 1 -
.../log/log-manage/log-manage.component.html | 14 +-
.../routes/log/log-manage/log-manage.component.ts | 10 +-
web-app/src/app/service/log.service.ts | 14 +-
18 files changed, 589 insertions(+), 574 deletions(-)
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
index 8628d5b72f..e3c387dbe6 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/alert/LogPeriodicAlertE2eTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -70,7 +70,6 @@ public class LogPeriodicAlertE2eTest {
private static final String GREPTIME_IMAGE = "greptime/greptimedb:latest";
private static final int GREPTIME_HTTP_PORT = 4000;
private static final int GREPTIME_GRPC_PORT = 4001;
- private static final int GREPTIME_PG_PORT = 4003;
private static final Duration CONTAINER_STARTUP_TIMEOUT =
Duration.ofSeconds(120);
@LocalServerPort
@@ -86,17 +85,16 @@ public class LogPeriodicAlertE2eTest {
private AlarmCommonReduce alarmCommonReduce;
static GenericContainer<?> vector;
-
+
static GenericContainer<?> greptimedb;
static {
greptimedb = new
GenericContainer<>(DockerImageName.parse(GREPTIME_IMAGE))
- .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT,
GREPTIME_PG_PORT)
+ .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT)
.withCommand("standalone", "start",
"--http-addr", "0.0.0.0:" + GREPTIME_HTTP_PORT,
- "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT,
- "--postgres-addr", "0.0.0.0:" + GREPTIME_PG_PORT)
- .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT, GREPTIME_PG_PORT))
+ "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT)
+ .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT))
.withStartupTimeout(CONTAINER_STARTUP_TIMEOUT);
greptimedb.start();
}
@@ -108,7 +106,6 @@ public class LogPeriodicAlertE2eTest {
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
- r.add("warehouse.store.greptime.postgres-endpoint", () -> "localhost:"
+ greptimedb.getMappedPort(GREPTIME_PG_PORT));
r.add("warehouse.store.greptime.username", () -> "");
r.add("warehouse.store.greptime.password", () -> "");
}
@@ -230,4 +227,4 @@ public class LogPeriodicAlertE2eTest {
errorLabelsByIndividual.put("type", "error_count");
errorCountAlertByIndividual.setLabels(errorLabelsByIndividual);
}
-}
\ No newline at end of file
+}
diff --git
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
index f491d8562f..4aba2f1eb2 100644
---
a/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
+++
b/hertzbeat-e2e/hertzbeat-log-e2e/src/test/java/org/apache/hertzbeat/log/storage/GreptimeLogStorageE2eTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -59,7 +59,6 @@ public class GreptimeLogStorageE2eTest {
private static final String GREPTIME_IMAGE = "greptime/greptimedb:latest";
private static final int GREPTIME_HTTP_PORT = 4000;
private static final int GREPTIME_GRPC_PORT = 4001;
- private static final int GREPTIME_PG_PORT = 4003;
private static final Duration CONTAINER_STARTUP_TIMEOUT =
Duration.ofSeconds(120);
@LocalServerPort
@@ -76,12 +75,11 @@ public class GreptimeLogStorageE2eTest {
static {
greptimedb = new
GenericContainer<>(DockerImageName.parse(GREPTIME_IMAGE))
- .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT,
GREPTIME_PG_PORT)
+ .withExposedPorts(GREPTIME_HTTP_PORT, GREPTIME_GRPC_PORT)
.withCommand("standalone", "start",
"--http-addr", "0.0.0.0:" + GREPTIME_HTTP_PORT,
- "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT,
- "--postgres-addr", "0.0.0.0:" + GREPTIME_PG_PORT)
- .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT, GREPTIME_PG_PORT))
+ "--rpc-bind-addr", "0.0.0.0:" + GREPTIME_GRPC_PORT)
+ .waitingFor(Wait.forListeningPorts(GREPTIME_HTTP_PORT,
GREPTIME_GRPC_PORT))
.withStartupTimeout(CONTAINER_STARTUP_TIMEOUT);
greptimedb.start();
}
@@ -92,7 +90,6 @@ public class GreptimeLogStorageE2eTest {
r.add("warehouse.store.greptime.enabled", () -> "true");
r.add("warehouse.store.greptime.http-endpoint", () ->
"http://localhost:" + greptimedb.getMappedPort(GREPTIME_HTTP_PORT));
r.add("warehouse.store.greptime.grpc-endpoints", () -> "localhost:" +
greptimedb.getMappedPort(GREPTIME_GRPC_PORT));
- r.add("warehouse.store.greptime.postgres-endpoint", () -> "localhost:"
+ greptimedb.getMappedPort(GREPTIME_PG_PORT));
r.add("warehouse.store.greptime.username", () -> "");
r.add("warehouse.store.greptime.password", () -> "");
}
@@ -119,7 +116,7 @@ public class GreptimeLogStorageE2eTest {
void testLogStorageToGreptimeDb() {
List<LogEntry> capturedLogs = new ArrayList<>();
-
+
// Wait for Vector to generate and send logs to HertzBeat
await().atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(3))
@@ -134,7 +131,7 @@ public class GreptimeLogStorageE2eTest {
Thread.currentThread().interrupt();
throw new RuntimeException("Test interrupted", e);
}
-
+
// Assert that we have captured at least some logs
assertFalse(capturedLogs.isEmpty(), "Should have captured
at least one log entry");
});
@@ -145,7 +142,7 @@ public class GreptimeLogStorageE2eTest {
assertNotNull(firstLog, "First log should not be null");
assertNotNull(firstLog.getBody(), "Log body should not be null");
assertNotNull(firstLog.getSeverityText(), "Severity text should not be
null");
-
+
// Additional wait to ensure logs are persisted to GreptimeDB
await().atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
@@ -162,8 +159,8 @@ public class GreptimeLogStorageE2eTest {
private List<LogEntry> queryStoredLogs() {
long endTime = System.currentTimeMillis();
long startTime = endTime - Duration.ofMinutes(5).toMillis(); // Look
back 5 minutes
-
+
return greptimeDbDataStorage.queryLogsByMultipleConditions(
- startTime, endTime, null, null, null, null, null);
+ startTime, endTime, null, null, null, null);
}
-}
\ No newline at end of file
+}
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
index 5d0bed1f45..2de4de5ab2 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/controller/LogQueryController.java
@@ -61,54 +61,50 @@ public class LogQueryController {
}
@GetMapping("/list")
- @Operation(summary = "Query logs by time range with optional filters",
- description = "Query logs by [start,end] in ms and optional
filters with pagination. Returns paginated log entries sorted by timestamp in
descending order.")
+ @Operation(summary = "Query logs by time range with optional filters",
+ description = "Query logs by [start,end] in ms and optional
filters with pagination. Returns paginated log entries sorted by timestamp in
descending order.")
public ResponseEntity<Message<Page<LogEntry>>> list(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
@RequestParam(value = "severityText", required = false) String
severityText,
- @Parameter(description = "Log content search keyword", example =
"error")
- @RequestParam(value = "search", required = false) String search,
- @Parameter(description = "Page index starting from 0", example =
"0")
+ @Parameter(description = "Page index starting from 0", example =
"0")
@RequestParam(value = "pageIndex", required = false, defaultValue
= "0") Integer pageIndex,
- @Parameter(description = "Number of items per page", example =
"20")
+ @Parameter(description = "Number of items per page", example =
"20")
@RequestParam(value = "pageSize", required = false, defaultValue =
"20") Integer pageSize) {
- Page<LogEntry> result = getPagedLogs(start, end, traceId, spanId,
severityNumber, severityText, search, pageIndex, pageSize);
+ Page<LogEntry> result = getPagedLogs(start, end, traceId, spanId,
severityNumber, severityText, pageIndex, pageSize);
return ResponseEntity.ok(Message.success(result));
}
@GetMapping("/stats/overview")
- @Operation(summary = "Log overview statistics",
- description = "Overall counts and basic statistics with filters.
Provides counts by severity levels according to OpenTelemetry standard.")
+ @Operation(summary = "Log overview statistics",
+ description = "Overall counts and basic statistics with
filters. Provides counts by severity levels according to OpenTelemetry
standard.")
public ResponseEntity<Message<Map<String, Object>>> overviewStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText,
- @Parameter(description = "Log content search keyword", example =
"error")
- @RequestParam(value = "search", required = false) String search) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
+
Map<String, Object> overview = new HashMap<>();
overview.put("totalCount", logs.size());
-
+
// Count by severity levels according to OpenTelemetry standard
// TRACE: 1-4, DEBUG: 5-8, INFO: 9-12, WARN: 13-16, ERROR: 17-20,
FATAL: 21-24
long fatalCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 21 && log.getSeverityNumber() <=
24).count();
@@ -117,117 +113,114 @@ public class LogQueryController {
long infoCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 9 && log.getSeverityNumber() <=
12).count();
long debugCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 5 && log.getSeverityNumber() <=
8).count();
long traceCount = logs.stream().filter(log -> log.getSeverityNumber()
!= null && log.getSeverityNumber() >= 1 && log.getSeverityNumber() <=
4).count();
-
+
overview.put("fatalCount", fatalCount);
overview.put("errorCount", errorCount);
overview.put("warnCount", warnCount);
overview.put("infoCount", infoCount);
overview.put("debugCount", debugCount);
overview.put("traceCount", traceCount);
-
+
return ResponseEntity.ok(Message.success(overview));
}
@GetMapping("/stats/trace-coverage")
- @Operation(summary = "Trace coverage statistics",
- description = "Statistics about trace information availability.
Shows how many logs have trace IDs, span IDs, or both for distributed tracing
analysis.")
+ @Operation(summary = "Trace coverage statistics",
+ description = "Statistics about trace information availability.
Shows how many logs have trace IDs, span IDs, or both for distributed tracing
analysis.")
public ResponseEntity<Message<Map<String, Object>>> traceCoverageStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText,
- @Parameter(description = "Log content search keyword", example =
"error")
- @RequestParam(value = "search", required = false) String search) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
+
Map<String, Object> result = new HashMap<>();
-
+
// Trace coverage statistics
long withTraceId = logs.stream().filter(log -> log.getTraceId() !=
null && !log.getTraceId().isEmpty()).count();
long withSpanId = logs.stream().filter(log -> log.getSpanId() != null
&& !log.getSpanId().isEmpty()).count();
- long withBothTraceAndSpan = logs.stream().filter(log ->
- log.getTraceId() != null && !log.getTraceId().isEmpty()
- && log.getSpanId() != null &&
!log.getSpanId().isEmpty()).count();
+ long withBothTraceAndSpan = logs.stream().filter(log ->
+ log.getTraceId() != null && !log.getTraceId().isEmpty()
+ && log.getSpanId() != null &&
!log.getSpanId().isEmpty()).count();
long withoutTrace = logs.size() - withTraceId;
-
+
Map<String, Long> traceCoverage = new HashMap<>();
traceCoverage.put("withTrace", withTraceId);
traceCoverage.put("withoutTrace", withoutTrace);
traceCoverage.put("withSpan", withSpanId);
traceCoverage.put("withBothTraceAndSpan", withBothTraceAndSpan);
-
+
result.put("traceCoverage", traceCoverage);
return ResponseEntity.ok(Message.success(result));
}
@GetMapping("/stats/trend")
- @Operation(summary = "Log trend over time",
- description = "Count logs by hour intervals with filters. Groups
logs by hour and provides time-series data for trend analysis.")
+ @Operation(summary = "Log trend over time",
+ description = "Count logs by hour intervals with filters.
Groups logs by hour and provides time-series data for trend analysis.")
public ResponseEntity<Message<Map<String, Object>>> trendStats(
- @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
+ @Parameter(description = "Start timestamp in milliseconds (Unix
timestamp)", example = "1640995200000")
@RequestParam(value = "start", required = false) Long start,
- @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
+ @Parameter(description = "End timestamp in milliseconds (Unix
timestamp)", example = "1641081600000")
@RequestParam(value = "end", required = false) Long end,
- @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
+ @Parameter(description = "Trace ID for distributed tracing",
example = "1234567890abcdef")
@RequestParam(value = "traceId", required = false) String traceId,
- @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
+ @Parameter(description = "Span ID for distributed tracing",
example = "abcdef1234567890")
@RequestParam(value = "spanId", required = false) String spanId,
- @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
+ @Parameter(description = "Log severity number (1-24 according to
OpenTelemetry standard)", example = "9")
@RequestParam(value = "severityNumber", required = false) Integer
severityNumber,
- @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
- @RequestParam(value = "severityText", required = false) String
severityText,
- @Parameter(description = "Log content search keyword", example =
"error")
- @RequestParam(value = "search", required = false) String search) {
- List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText, search);
-
+ @Parameter(description = "Log severity text (TRACE, DEBUG, INFO,
WARN, ERROR, FATAL)", example = "INFO")
+ @RequestParam(value = "severityText", required = false) String
severityText) {
+ List<LogEntry> logs = getFilteredLogs(start, end, traceId, spanId,
severityNumber, severityText);
+
// Group by hour
Map<String, Long> hourlyStats = logs.stream()
- .filter(log -> log.getTimeUnixNano() != null)
- .collect(Collectors.groupingBy(
- log -> {
- long timestampMs = log.getTimeUnixNano() /
1_000_000L;
- LocalDateTime dateTime = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(timestampMs),
- ZoneId.systemDefault());
- return
dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00"));
- },
- Collectors.counting()));
-
+ .filter(log -> log.getTimeUnixNano() != null)
+ .collect(Collectors.groupingBy(
+ log -> {
+ long timestampMs = log.getTimeUnixNano() / 1_000_000L;
+ LocalDateTime dateTime = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(timestampMs),
+ ZoneId.systemDefault());
+ return
dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00"));
+ },
+ Collectors.counting()));
+
Map<String, Object> result = new HashMap<>();
result.put("hourlyStats", hourlyStats);
return ResponseEntity.ok(Message.success(result));
}
- private List<LogEntry> getFilteredLogs(Long start, Long end, String
traceId, String spanId,
- Integer severityNumber, String
severityText, String search) {
+ private List<LogEntry> getFilteredLogs(Long start, Long end, String
traceId, String spanId,
+ Integer severityNumber, String
severityText) {
// Use the new multi-condition query method
- return historyDataReader.queryLogsByMultipleConditions(start, end,
traceId, spanId, severityNumber, severityText, search);
+ return historyDataReader.queryLogsByMultipleConditions(start, end,
traceId, spanId, severityNumber, severityText);
}
- private Page<LogEntry> getPagedLogs(Long start, Long end, String traceId,
String spanId,
- Integer severityNumber, String
severityText, String search,
- Integer pageIndex, Integer pageSize) {
+ private Page<LogEntry> getPagedLogs(Long start, Long end, String traceId,
String spanId,
+ Integer severityNumber, String
severityText, Integer pageIndex, Integer pageSize) {
// Calculate pagination parameters
int offset = pageIndex * pageSize;
-
+
// Get total count and paginated data
- long totalElements =
historyDataReader.countLogsByMultipleConditions(start, end, traceId, spanId,
severityNumber, severityText, search);
+ long totalElements =
historyDataReader.countLogsByMultipleConditions(start, end, traceId, spanId,
severityNumber, severityText);
List<LogEntry> pagedLogs =
historyDataReader.queryLogsByMultipleConditionsWithPagination(
- start, end, traceId, spanId, severityNumber, severityText,
search, offset, pageSize);
-
+ start, end, traceId, spanId, severityNumber, severityText, offset,
pageSize);
+
// Create PageRequest (sorted by timestamp descending)
Sort sort = Sort.by(Sort.Direction.DESC, "timeUnixNano");
PageRequest pageRequest = PageRequest.of(pageIndex, pageSize, sort);
-
+
// Return Spring Data Page object
return new PageImpl<>(pagedLogs, pageRequest, totalElements);
}
-}
\ No newline at end of file
+}
+
+
diff --git
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
index 3c1afb932a..96173c2a24 100644
---
a/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
+++
b/hertzbeat-log/src/test/java/org/apache/hertzbeat/log/controller/LogQueryControllerTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -87,26 +87,24 @@ class LogQueryControllerTest {
List<LogEntry> mockLogs = Arrays.asList(logEntry1, logEntry2);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.countLogsByMultipleConditions(anyLong(),
anyLong(), anyString(),
- anyString(), anyInt(), anyString(), isNull())).thenReturn(2L);
- // Fixed: Added searchContent param matcher (7th arg)
-
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(anyLong(),
anyLong(),
- anyString(), anyString(), anyInt(), anyString(), isNull(),
anyInt(), anyInt()))
+ when(historyDataReader.countLogsByMultipleConditions(anyLong(),
anyLong(), anyString(),
+ anyString(), anyInt(), anyString())).thenReturn(2L);
+
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(anyLong(),
anyLong(),
+ anyString(), anyString(), anyInt(), anyString(), anyInt(),
anyInt()))
.thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/list")
- .param("start", "1734005477000")
- .param("end", "1734005478000")
- .param("traceId", "trace123")
- .param("spanId", "span456")
- .param("severityNumber", "9")
- .param("severityText", "INFO")
- .param("pageIndex", "0")
- .param("pageSize", "20")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/list")
+ .param("start", "1734005477000")
+ .param("end", "1734005478000")
+ .param("traceId", "trace123")
+ .param("spanId", "span456")
+ .param("severityNumber", "9")
+ .param("severityText", "INFO")
+ .param("pageIndex", "0")
+ .param("pageSize", "20")
+ )
.andDo(print())
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
@@ -129,18 +127,16 @@ class LogQueryControllerTest {
.build()
);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.countLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull(), isNull())).thenReturn(1L);
- // Fixed: Added searchContent param matcher (7th arg)
-
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(isNull(),
isNull(),
- isNull(), isNull(), isNull(), isNull(), isNull(), eq(0),
eq(20)))
+ when(historyDataReader.countLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull())).thenReturn(1L);
+
when(historyDataReader.queryLogsByMultipleConditionsWithPagination(isNull(),
isNull(),
+ isNull(), isNull(), isNull(), isNull(), eq(0), eq(20)))
.thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/list")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/list")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.content").isArray())
@@ -167,14 +163,13 @@ class LogQueryControllerTest {
LogEntry.builder().severityNumber(21).build()
);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/overview")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/overview")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.totalCount").value(8))
@@ -193,16 +188,15 @@ class LogQueryControllerTest {
LogEntry.builder().severityNumber(17).build()
);
- // Fixed: Added searchContent param matcher (7th arg)
-
when(historyDataReader.queryLogsByMultipleConditions(eq(1734005477000L),
eq(1734005478000L),
- isNull(), isNull(), isNull(), isNull(),
isNull())).thenReturn(mockLogs);
+
when(historyDataReader.queryLogsByMultipleConditions(eq(1734005477000L),
eq(1734005478000L),
+ isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/overview")
- .param("start", "1734005477000")
- .param("end", "1734005478000")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/overview")
+ .param("start", "1734005477000")
+ .param("end", "1734005478000")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.totalCount").value(2));
@@ -223,14 +217,13 @@ class LogQueryControllerTest {
LogEntry.builder().build() // null values
);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trace-coverage")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trace-coverage")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.traceCoverage.withTrace").value(3))
@@ -251,14 +244,13 @@ class LogQueryControllerTest {
LogEntry.builder().timeUnixNano(1734009077630000000L).build()
);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trend")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trend")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.hourlyStats").isMap());
@@ -271,17 +263,16 @@ class LogQueryControllerTest {
LogEntry.builder().timeUnixNano(null).build() // This should
be filtered out
);
- // Fixed: Added searchContent param matcher (7th arg)
- when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
- isNull(), isNull(), isNull(), isNull())).thenReturn(mockLogs);
+ when(historyDataReader.queryLogsByMultipleConditions(isNull(),
isNull(), isNull(),
+ isNull(), isNull(), isNull())).thenReturn(mockLogs);
mockMvc.perform(
- MockMvcRequestBuilders
- .get("/api/logs/stats/trend")
- )
+ MockMvcRequestBuilders
+ .get("/api/logs/stats/trend")
+ )
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int)
CommonConstants.SUCCESS_CODE))
.andExpect(jsonPath("$.data.hourlyStats").isMap());
}
-}
\ No newline at end of file
+}
diff --git a/hertzbeat-startup/src/main/resources/application-test.yml
b/hertzbeat-startup/src/main/resources/application-test.yml
index 53bcfa3b00..4aa13715ff 100644
--- a/hertzbeat-startup/src/main/resources/application-test.yml
+++ b/hertzbeat-startup/src/main/resources/application-test.yml
@@ -91,7 +91,6 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
- postgres-endpoint: localhost:4003
database: public
username: greptime
password: greptime
diff --git a/hertzbeat-startup/src/main/resources/application.yml
b/hertzbeat-startup/src/main/resources/application.yml
index 8d36940e2c..e141b4606c 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -209,7 +209,6 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
- postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git a/hertzbeat-warehouse/pom.xml b/hertzbeat-warehouse/pom.xml
index 0d4ba07e18..95a32a5012 100644
--- a/hertzbeat-warehouse/pom.xml
+++ b/hertzbeat-warehouse/pom.xml
@@ -29,20 +29,24 @@
<name>${project.artifactId}</name>
<dependencies>
+ <!-- common -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-common</artifactId>
<scope>provided</scope>
</dependency>
+ <!-- plugin -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-plugin</artifactId>
<scope>provided</scope>
</dependency>
+ <!-- util -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
+ <!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -57,16 +61,13 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- <scope>provided</scope>
- </dependency>
+ <!-- taos-jdbc driver -->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>${taos-jdbcdriver.version}</version>
</dependency>
+ <!-- IoTDB -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
@@ -82,16 +83,19 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- QuestDB -->
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb</artifactId>
<version>${questdb.version}</version>
</dependency>
+ <!-- influxdb Here, support for version 1.7, use influxdb-java, Full
support for version 2.x requires influxdb-client-java -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
+ <!-- greptimedb -->
<dependency>
<groupId>io.greptime</groupId>
<artifactId>ingester-all</artifactId>
@@ -115,6 +119,7 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -125,11 +130,13 @@
</exclusion>
</exclusions>
</dependency>
+ <!--redis-->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<scope>provided</scope>
</dependency>
+ <!-- swagger -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
@@ -143,15 +150,11 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
- <dependency>
- <groupId>org.postgresql</groupId>
- <artifactId>postgresql</artifactId>
- </dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
index aab903b745..7e4c64341b 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,199 +19,111 @@
package org.apache.hertzbeat.warehouse.db;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.log.LogEntry;
-import org.apache.hertzbeat.common.util.JsonUtil;
+import org.apache.hertzbeat.common.constants.NetworkConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.common.util.Base64Util;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
-import org.springframework.beans.factory.annotation.Autowired;
+import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.jdbc.core.BeanPropertyRowMapper;
-import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.MediaType;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
-import java.beans.PropertyDescriptor;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
- * query executor for GreptimeDB SQL via JDBC
+ * query executor for GreptimeDB SQL
*/
@Slf4j
@Component("greptimeSqlQueryExecutor")
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
public class GreptimeSqlQueryExecutor extends SqlQueryExecutor {
+ private static final String QUERY_PATH = "/v1/sql";
private static final String DATASOURCE = "Greptime-sql";
- private static final String DRIVER_CLASS_NAME = "org.postgresql.Driver";
- private static final String JDBC_URL_PREFIX = "jdbc:postgresql://";
- private final JdbcTemplate jdbcTemplate;
- private final HikariDataSource dataSource;
+ private final GreptimeProperties greptimeProperties;
- @Autowired
- public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties) {
- super(null, null); // No longer using RestTemplate or HttpSqlProperties
- // Initialize JDBC DataSource
- this.dataSource = new HikariDataSource();
-
- // Construct JDBC URL: jdbc:postgresql://endpoint/database
- String jdbcUrl = JDBC_URL_PREFIX +
greptimeProperties.postgresEndpoint() + "/" + greptimeProperties.database();
- this.dataSource.setJdbcUrl(jdbcUrl);
-
- // Fixed driver class name for PostgreSQL protocol
- this.dataSource.setDriverClassName(DRIVER_CLASS_NAME);
-
- if (greptimeProperties.username() != null) {
- this.dataSource.setUsername(greptimeProperties.username());
- }
- if (greptimeProperties.password() != null) {
- this.dataSource.setPassword(greptimeProperties.password());
- }
- this.dataSource.setMaximumPoolSize(10);
- this.dataSource.setMinimumIdle(2);
- this.dataSource.setConnectionTimeout(30000);
-
- this.jdbcTemplate = new JdbcTemplate(this.dataSource);
- log.info("Initialized GreptimeDB JDBC connection to {}", jdbcUrl);
- }
-
- /**
- * Constructor for compatibility with existing tests.
- * delegating to the main constructor.
- * @param greptimeProperties greptime properties
- * @param restTemplate (unused) rest template
- */
public GreptimeSqlQueryExecutor(GreptimeProperties greptimeProperties,
RestTemplate restTemplate) {
- this(greptimeProperties);
- }
-
- /**
- * Constructor for testing purposes only.
- * @param jdbcTemplate Mocked JdbcTemplate
- */
- public GreptimeSqlQueryExecutor(JdbcTemplate jdbcTemplate) {
- super(null, null);
- this.dataSource = null;
- this.jdbcTemplate = jdbcTemplate;
+ super(restTemplate, new
SqlQueryExecutor.HttpSqlProperties(greptimeProperties.httpEndpoint() +
QUERY_PATH,
+ greptimeProperties.username(), greptimeProperties.password()));
+ this.greptimeProperties = greptimeProperties;
}
@Override
- public List<Map<String, Object>> execute(String sql) {
- log.debug("Executing GreptimeDB SQL: {}", sql);
+ public List<Map<String, Object>> execute(String queryString) {
+ List<Map<String, Object>> results = new LinkedList<>();
try {
- return jdbcTemplate.queryForList(sql);
- } catch (Exception e) {
- log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
- throw e;
- }
- }
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+ headers.setAccept(List.of(MediaType.APPLICATION_JSON));
+ if (StringUtils.hasText(greptimeProperties.username())
+ && StringUtils.hasText(greptimeProperties.password())) {
+ String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
+ String encodedAuth = Base64Util.encode(authStr);
+ headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
+ }
- /**
- * Execute SQL query with arguments (Prepared Statement)
- * @param sql SQL query with ? placeholders
- * @param args Arguments for placeholders
- * @return List of rows
- */
- public List<LogEntry> query(String sql, Object... args) {
- log.debug("Executing GreptimeDB SQL: {} with args: {}", sql, args);
- try {
- // Use custom RowMapper that extends BeanPropertyRowMapper
- return jdbcTemplate.query(sql, new GreptimeLogEntryRowMapper(),
args);
- } catch (Exception e) {
- log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
- throw e;
- }
- }
+ String requestBody = "sql=" + queryString;
+ HttpEntity<String> httpEntity = new HttpEntity<>(requestBody,
headers);
- /**
- * Execute count SQ
- * @param sql SQL
- * @return count
- */
- public Long count(String sql, Object... args) {
- try {
- return jdbcTemplate.queryForObject(sql, Long.class, args);
+ String url = greptimeProperties.httpEndpoint() + QUERY_PATH;
+ if (StringUtils.hasText(greptimeProperties.database())) {
+ url += "?db=" + greptimeProperties.database();
+ }
+
+ ResponseEntity<GreptimeSqlQueryContent> responseEntity =
restTemplate.exchange(url,
+ HttpMethod.POST, httpEntity,
GreptimeSqlQueryContent.class);
+
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ GreptimeSqlQueryContent responseBody =
responseEntity.getBody();
+ if (responseBody != null && responseBody.getCode() == 0
+ && responseBody.getOutput() != null &&
!responseBody.getOutput().isEmpty()) {
+
+ for (GreptimeSqlQueryContent.Output output :
responseBody.getOutput()) {
+ if (output.getRecords() != null &&
output.getRecords().getRows() != null) {
+ GreptimeSqlQueryContent.Output.Records.Schema
schema = output.getRecords().getSchema();
+ List<List<Object>> rows =
output.getRecords().getRows();
+
+ for (List<Object> row : rows) {
+ Map<String, Object> rowMap = new HashMap<>();
+ if (schema != null &&
schema.getColumnSchemas() != null) {
+ for (int i = 0; i <
Math.min(schema.getColumnSchemas().size(), row.size()); i++) {
+ String columnName =
schema.getColumnSchemas().get(i).getName();
+ Object value = row.get(i);
+ rowMap.put(columnName, value);
+ }
+ } else {
+ for (int i = 0; i < row.size(); i++) {
+ rowMap.put("col_" + i, row.get(i));
+ }
+ }
+ results.add(rowMap);
+ }
+ }
+ }
+ }
+ } else {
+ log.error("query metrics data from greptime failed. {}",
responseEntity);
+ }
} catch (Exception e) {
- log.error("Failed to execute GreptimeDB SQL: {}", sql, e);
- throw e;
+ log.error("query metrics data from greptime error. {}",
e.getMessage(), e);
}
+ return results;
}
@Override
public String getDatasource() {
return DATASOURCE;
}
-
- // Ensure to close the datasource when the bean is destroyed
- public void close() {
- if (this.dataSource != null && !this.dataSource.isClosed()) {
- this.dataSource.close();
- }
- }
-
- /**
- * Custom RowMapper that extends BeanPropertyRowMapper to leverage default
mapping
- * while overriding specific fields that need type conversion (Timestamp
-> Long, JSON String -> Object).
- */
- private static class GreptimeLogEntryRowMapper extends
BeanPropertyRowMapper<LogEntry> {
-
- public GreptimeLogEntryRowMapper() {
- super(LogEntry.class);
- }
-
- @Override
- protected Object getColumnValue(ResultSet rs, int index,
PropertyDescriptor pd) throws SQLException {
- String propertyName = pd.getName();
-
- // 1. Handle Timestamp to Long (nanoseconds) conversion
- if ("timeUnixNano".equals(propertyName) ||
"observedTimeUnixNano".equals(propertyName)) {
- Timestamp timestamp = rs.getTimestamp(index);
- if (timestamp == null) {
- return null;
- }
- long seconds = timestamp.getTime() / 1000;
- long nanos = timestamp.getNanos();
- return seconds * 1_000_000_000L + nanos;
- }
-
- // 2. Handle JSON String to Map conversion (attributes, resource)
- if ("attributes".equals(propertyName) ||
"resource".equals(propertyName)) {
- String json = rs.getString(index);
- if (!StringUtils.hasText(json)) {
- return null;
- }
- try {
- return JsonUtil.fromJson(json, new
TypeReference<Map<String, Object>>() {});
- } catch (Exception e) {
- log.warn("Failed to parse JSON map for {}: {}",
propertyName, json);
- return null;
- }
- }
-
- // 3. Handle JSON String to InstrumentationScope conversion
- if ("instrumentationScope".equals(propertyName)) {
- String json = rs.getString(index);
- if (!StringUtils.hasText(json)) {
- return null;
- }
- try {
- return JsonUtil.fromJson(json,
LogEntry.InstrumentationScope.class);
- } catch (Exception e) {
- log.warn("Failed to parse instrumentationScope: {}", json);
- return null;
- }
- }
-
- // 4. Default handling for other fields (traceId, spanId,
severityText, body, etc.)
- return super.getColumnValue(rs, index, pd);
- }
- }
-}
\ No newline at end of file
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
index 567e27177e..89fe0f3adb 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/HistoryDataReader.java
@@ -65,67 +65,45 @@ public interface HistoryDataReader {
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
- * @param searchContent search content in log body
* @return filtered log entries
*/
default List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long
endTime, String traceId,
String spanId,
Integer severityNumber,
- String severityText,
String searchContent) {
+ String severityText) {
throw new UnsupportedOperationException("query logs by multiple
conditions is not supported");
}
/**
- * Query logs with multiple filter conditions and pagination (Legacy)
- */
- default List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
- String
spanId, Integer severityNumber,
- String
severityText, Integer offset, Integer limit) {
- return queryLogsByMultipleConditionsWithPagination(startTime, endTime,
traceId, spanId, severityNumber, severityText, null, offset, limit);
- }
-
- /**
- * Query logs with multiple filter conditions and pagination including
search content
+ * Query logs with multiple filter conditions and pagination
* @param startTime start time in milliseconds
* @param endTime end time in milliseconds
* @param traceId trace ID filter
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
- * @param searchContent search content in log body
* @param offset pagination offset
* @param limit pagination limit
* @return filtered log entries with pagination
*/
default List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
String
spanId, Integer severityNumber,
- String
severityText, String searchContent,
- Integer
offset, Integer limit) {
+ String
severityText, Integer offset, Integer limit) {
throw new UnsupportedOperationException("query logs by multiple
conditions with pagination is not supported");
}
/**
- * Count logs with multiple filter conditions (Legacy)
- */
- default long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
- String spanId, Integer
severityNumber,
- String severityText) {
- return countLogsByMultipleConditions(startTime, endTime, traceId,
spanId, severityNumber, severityText, null);
- }
-
- /**
- * Count logs with multiple filter conditions including search content
+ * Count logs with multiple filter conditions
* @param startTime start time in milliseconds
* @param endTime end time in milliseconds
* @param traceId trace ID filter
* @param spanId span ID filter
* @param severityNumber severity number filter
* @param severityText severity text filter
- * @param searchContent search content in log body
* @return count of matching log entries
*/
default long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
String spanId, Integer
severityNumber,
- String severityText, String
searchContent) {
+ String severityText) {
throw new UnsupportedOperationException("count logs by multiple
conditions is not supported");
}
-}
\ No newline at end of file
+}
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index c62e66a035..c2834f5ab9 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -26,6 +26,29 @@ import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hertzbeat.common.constants.CommonConstants;
@@ -37,9 +60,9 @@ import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.Base64Util;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.common.util.TimePeriodUtil;
-import org.apache.hertzbeat.warehouse.db.GreptimeSqlQueryExecutor;
import
org.apache.hertzbeat.warehouse.store.history.tsdb.AbstractHistoryDataStorage;
import org.apache.hertzbeat.warehouse.store.history.tsdb.vm.PromQlQueryContent;
+import org.apache.hertzbeat.warehouse.db.GreptimeSqlQueryExecutor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@@ -53,30 +76,8 @@ import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.ZonedDateTime;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAmount;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
/**
- * GreptimeDB data storage
+ * GreptimeDB data storage, only supports GreptimeDB version >= v0.5
*/
@Component
@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled",
havingValue = "true")
@@ -94,8 +95,11 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
private static final int LOG_BATCH_SIZE = 500;
private GreptimeDB greptimeDb;
+
private final GreptimeProperties greptimeProperties;
+
private final RestTemplate restTemplate;
+
private final GreptimeSqlQueryExecutor greptimeSqlQueryExecutor;
public GreptimeDbDataStorage(GreptimeProperties greptimeProperties,
RestTemplate restTemplate,
@@ -123,6 +127,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
log.error("[warehouse greptime] Fail to start GreptimeDB client");
return false;
}
+
return true;
}
@@ -161,15 +166,19 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
RowWrapper rowWrapper = metricsData.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();
+
AtomicInteger index = new AtomicInteger(-1);
rowWrapper.cellStream().forEach(cell -> {
index.getAndIncrement();
+
if (CommonConstants.NULL_VALUE.equals(cell.getValue())) {
values[2 + index.get()] = null;
return;
}
+
Boolean label =
cell.getMetadataAsBoolean(MetricDataConstants.LABEL);
Byte type = cell.getMetadataAsByte(MetricDataConstants.TYPE);
+
if (label) {
values[2 + index.get()] = cell.getValue();
} else {
@@ -180,8 +189,10 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
}
}
});
+
table.addRow(values);
}
+
CompletableFuture<Result<WriteOk, Err>> writeFuture =
greptimeDb.write(table);
try {
Result<WriteOk, Err> result = writeFuture.get(10,
TimeUnit.SECONDS);
@@ -201,6 +212,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
Map<String, Long> timeRange = getTimeRange(history);
Long start = timeRange.get(LABEL_KEY_START_TIME);
Long end = timeRange.get(LABEL_KEY_END_TIME);
+
String step = getTimeStep(start, end);
return getHistoryData(start, end, step, instance, app, metrics,
metric);
@@ -216,6 +228,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
Map<String, Long> timeRange = getTimeRange(history);
Long start = timeRange.get(LABEL_KEY_START_TIME);
Long end = timeRange.get(LABEL_KEY_END_TIME);
+
String step = getTimeStep(start, end);
Map<String, List<Value>> instanceValuesMap = getHistoryData(start,
end, step, instance, app, metrics, metric);
@@ -224,25 +237,32 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
// Therefore, we restrict the valid range by obtaining the post-query
timeframe.
// Since `gretime`'s `end` excludes the specified time, we add 4 hours.
List<Value> values =
instanceValuesMap.get(instanceValuesMap.keySet().stream().toList().get(0));
+ // effective time
long effectiveStart = values.get(0).getTime() / 1000;
long effectiveEnd = values.get(values.size() - 1).getTime() / 1000 +
Duration.ofHours(4).getSeconds();
+
String name = getTableName(metrics);
String timeSeriesSelector = name + "{" + LABEL_KEY_INSTANCE + "=\"" +
instance + "\"";
if (!CommonConstants.PROMETHEUS.equals(app)) {
timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD +
"=\"" + metric + "\"";
}
timeSeriesSelector = timeSeriesSelector + "}";
+
try {
+ // max
String finalTimeSeriesSelector = timeSeriesSelector;
URI uri = getUri(effectiveStart, effectiveEnd, step, uriComponents
-> "max_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMax);
+ // min
uri = getUri(effectiveStart, effectiveEnd, step, uriComponents ->
"min_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMin);
+ // avg
uri = getUri(effectiveStart, effectiveEnd, step, uriComponents ->
"avg_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
requestIntervalMetricAndPutValue(uri, instanceValuesMap,
Value::setMean);
} catch (Exception e) {
log.error("query interval metrics data from greptime error. {}",
e.getMessage(), e);
}
+
return instanceValuesMap;
}
@@ -282,6 +302,7 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
* @return step
*/
private String getTimeStep(long start, long end) {
+ // get step
String step = "60s";
if (end - start < Duration.ofDays(7).getSeconds() && end - start >
Duration.ofDays(1).getSeconds()) {
step = "1h";
@@ -310,9 +331,11 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
if (!CommonConstants.PROMETHEUS.equals(app)) {
timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD +
"=\"" + metric + "\"";
}
+
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
HttpEntity<Void> httpEntity = getHttpEntity();
+
String finalTimeSeriesSelector = timeSeriesSelector;
URI uri = getUri(start, end, step, uriComponents -> {
MultiValueMap<String, String> queryParams =
uriComponents.getQueryParams();
@@ -321,13 +344,16 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
}
return null;
});
+
ResponseEntity<PromQlQueryContent> responseEntity = null;
if (uri != null) {
- responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
+ responseEntity = restTemplate.exchange(uri,
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
}
if (responseEntity != null &&
responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from greptime success. {}", uri);
- if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null &&
responseEntity.getBody().getData().getResult() != null) {
+ if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
+ && responseEntity.getBody().getData().getResult() !=
null) {
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -362,7 +388,8 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username()) &&
StringUtils.hasText(greptimeProperties.password())) {
+ if (StringUtils.hasText(greptimeProperties.username())
+ && StringUtils.hasText(greptimeProperties.password())) {
String authStr = greptimeProperties.username() + ":" +
greptimeProperties.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
@@ -410,7 +437,8 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
return;
}
HttpEntity<Void> httpEntity = getHttpEntity();
- ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri, HttpMethod.GET, httpEntity,
PromQlQueryContent.class);
+ ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (!responseEntity.getStatusCode().is2xxSuccessful()) {
log.error("query interval metrics data from greptime failed. {}",
responseEntity);
return;
@@ -447,9 +475,6 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
this.greptimeDb.shutdownGracefully();
this.greptimeDb = null;
}
- if (this.greptimeSqlQueryExecutor != null) {
- this.greptimeSqlQueryExecutor.close();
- }
}
@Override
@@ -457,13 +482,15 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
if (!isServerAvailable()) {
return;
}
+
try {
+ // Create table schema
TableSchema.Builder tableSchemaBuilder =
TableSchema.newBuilder(LOG_TABLE_NAME);
tableSchemaBuilder.addTimestamp("time_unix_nano",
DataType.TimestampNanosecond)
.addField("observed_time_unix_nano",
DataType.TimestampNanosecond)
.addField("severity_number", DataType.Int32)
.addField("severity_text", DataType.String)
- .addField("body", DataType.String)
+ .addField("body", DataType.Json)
.addField("trace_id", DataType.String)
.addField("span_id", DataType.String)
.addField("trace_flags", DataType.Int32)
@@ -471,13 +498,16 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
.addField("resource", DataType.Json)
.addField("instrumentation_scope", DataType.Json)
.addField("dropped_attributes_count", DataType.Int32);
+
Table table = Table.from(tableSchemaBuilder.build());
- Object[] values = new Object[]{
+
+ // Convert LogEntry to table row
+ Object[] values = new Object[] {
logEntry.getTimeUnixNano() != null ?
logEntry.getTimeUnixNano() : System.nanoTime(),
logEntry.getObservedTimeUnixNano() != null ?
logEntry.getObservedTimeUnixNano() : System.nanoTime(),
logEntry.getSeverityNumber(),
logEntry.getSeverityText(),
- logEntry.getBody(),
+ JsonUtil.toJson(logEntry.getBody()),
logEntry.getTraceId(),
logEntry.getSpanId(),
logEntry.getTraceFlags(),
@@ -486,9 +516,13 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
JsonUtil.toJson(logEntry.getInstrumentationScope()),
logEntry.getDroppedAttributesCount()
};
+
table.addRow(values);
+
+ // Write to GreptimeDB
CompletableFuture<Result<WriteOk, Err>> writeFuture =
greptimeDb.write(table);
Result<WriteOk, Err> result = writeFuture.get(10,
TimeUnit.SECONDS);
+
if (result.isOk()) {
log.debug("[warehouse greptime-log] Write successful");
} else {
@@ -502,16 +536,14 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long
endTime, String traceId,
String spanId, Integer
severityNumber,
- String severityText,
String searchContent) {
+ String severityText) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM
").append(LOG_TABLE_NAME);
- List<Object> args = new ArrayList<>();
- buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
+ buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
sql.append(" ORDER BY time_unix_nano DESC");
- // Execute via JDBC executor using parameters
- List<LogEntry> rows =
greptimeSqlQueryExecutor.query(sql.toString(), args.toArray());
- return rows;
+ List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
+ return mapRowsToLogEntries(rows);
} catch (Exception e) {
log.error("[warehouse greptime-log] queryLogsByMultipleConditions
error: {}", e.getMessage(), e);
return List.of();
@@ -521,24 +553,22 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long
startTime, Long endTime, String traceId,
String
spanId, Integer severityNumber,
- String
severityText, String searchContent,
- Integer
offset, Integer limit) {
+ String
severityText, Integer offset, Integer limit) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM
").append(LOG_TABLE_NAME);
- List<Object> args = new ArrayList<>();
- buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
+ buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
sql.append(" ORDER BY time_unix_nano DESC");
+ // Add pagination
if (limit != null && limit > 0) {
- sql.append(" LIMIT ?");
- args.add(limit);
+ sql.append(" LIMIT ").append(limit);
if (offset != null && offset > 0) {
- sql.append(" OFFSET ?");
- args.add(offset);
+ sql.append(" OFFSET ").append(offset);
}
}
- return greptimeSqlQueryExecutor.query(sql.toString(),
args.toArray());
+ List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
+ return mapRowsToLogEntries(rows);
} catch (Exception e) {
log.error("[warehouse greptime-log]
queryLogsByMultipleConditionsWithPagination error: {}", e.getMessage(), e);
return List.of();
@@ -547,14 +577,20 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
@Override
public long countLogsByMultipleConditions(Long startTime, Long endTime,
String traceId,
- String spanId, Integer
severityNumber,
- String severityText, String
searchContent) {
+ String spanId, Integer
severityNumber,
+ String severityText) {
try {
StringBuilder sql = new StringBuilder("SELECT COUNT(*) as count
FROM ").append(LOG_TABLE_NAME);
- List<Object> args = new ArrayList<>();
- buildWhereConditions(sql, args, startTime, endTime, traceId,
spanId, severityNumber, severityText, searchContent);
+ buildWhereConditions(sql, startTime, endTime, traceId, spanId,
severityNumber, severityText);
- return greptimeSqlQueryExecutor.count(sql.toString(),
args.toArray());
+ List<Map<String, Object>> rows =
greptimeSqlQueryExecutor.execute(sql.toString());
+ if (rows != null && !rows.isEmpty()) {
+ Object countObj = rows.get(0).get("count");
+ if (countObj instanceof Number) {
+ return ((Number) countObj).longValue();
+ }
+ }
+ return 0;
} catch (Exception e) {
log.error("[warehouse greptime-log] countLogsByMultipleConditions
error: {}", e.getMessage(), e);
return 0;
@@ -565,6 +601,12 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
return ms * 1_000_000L;
}
+ private static String safeString(String input) {
+ if (input == null) {
+ return "";
+ }
+ return input.replace("'", "''");
+ }
/**
* build WHERE conditions
@@ -575,57 +617,151 @@ public class GreptimeDbDataStorage extends
AbstractHistoryDataStorage {
* @param spanId span id
* @param severityNumber severity number
*/
- private void buildWhereConditions(StringBuilder sql, List<Object> args,
Long startTime, Long endTime,
- String traceId, String spanId, Integer
severityNumber,
- String severityText, String
searchContent) {
+ private void buildWhereConditions(StringBuilder sql, Long startTime, Long
endTime, String traceId,
+ String spanId, Integer severityNumber,
String severityText) {
List<String> conditions = new ArrayList<>();
+
+ // Time range condition
if (startTime != null && endTime != null) {
- conditions.add("time_unix_nano >= ? AND time_unix_nano <= ?");
- args.add(msToNs(startTime));
- args.add(msToNs(endTime));
+ conditions.add("time_unix_nano >= " + msToNs(startTime) + " AND
time_unix_nano <= " + msToNs(endTime));
}
+
+ // TraceId condition
if (StringUtils.hasText(traceId)) {
- conditions.add("trace_id = ?");
- args.add(traceId);
+ conditions.add("trace_id = '" + safeString(traceId) + "'");
}
+
+ // SpanId condition
if (StringUtils.hasText(spanId)) {
- conditions.add("span_id = ?");
- args.add(spanId);
+ conditions.add("span_id = '" + safeString(spanId) + "'");
}
+
+ // Severity condition
if (severityNumber != null) {
- conditions.add("severity_number = ?");
- args.add(severityNumber);
+ conditions.add("severity_number = " + severityNumber);
}
+
+ // SeverityText condition
if (StringUtils.hasText(severityText)) {
- conditions.add("severity_text = ?");
- args.add(severityText);
- }
- if (StringUtils.hasText(searchContent)) {
- // Using CAST(body AS String) to search within JSON/String content.
- // GreptimeDB supports PostgreSQL protocol, this syntax is
generally safe.
- conditions.add("CAST(body AS String) LIKE ?");
- args.add("%" + searchContent + "%");
+ conditions.add("severity_text = '" + safeString(severityText) +
"'");
}
+
+ // Add WHERE clause if there are conditions
if (!conditions.isEmpty()) {
sql.append(" WHERE ").append(String.join(" AND ", conditions));
}
}
+ private List<LogEntry> mapRowsToLogEntries(List<Map<String, Object>> rows)
{
+ List<LogEntry> list = new LinkedList<>();
+ if (rows == null || rows.isEmpty()) {
+ return list;
+ }
+ for (Map<String, Object> row : rows) {
+ try {
+ LogEntry.InstrumentationScope scope = null;
+ Object scopeObj = row.get("instrumentation_scope");
+ if (scopeObj instanceof String scopeStr &&
StringUtils.hasText(scopeStr)) {
+ try {
+ scope = JsonUtil.fromJson(scopeStr,
LogEntry.InstrumentationScope.class);
+ } catch (Exception ignore) {
+ scope = null;
+ }
+ }
+
+ Object bodyObj = parseJsonMaybe(row.get("body"));
+ Map<String, Object> attributes =
castToMap(parseJsonMaybe(row.get("attributes")));
+ Map<String, Object> resource =
castToMap(parseJsonMaybe(row.get("resource")));
+
+ LogEntry entry = LogEntry.builder()
+ .timeUnixNano(castToLong(row.get("time_unix_nano")))
+
.observedTimeUnixNano(castToLong(row.get("observed_time_unix_nano")))
+
.severityNumber(castToInteger(row.get("severity_number")))
+ .severityText(castToString(row.get("severity_text")))
+ .body(bodyObj)
+ .traceId(castToString(row.get("trace_id")))
+ .spanId(castToString(row.get("span_id")))
+ .traceFlags(castToInteger(row.get("trace_flags")))
+ .attributes(attributes)
+ .resource(resource)
+ .instrumentationScope(scope)
+
.droppedAttributesCount(castToInteger(row.get("dropped_attributes_count")))
+ .build();
+ list.add(entry);
+ } catch (Exception e) {
+ log.warn("[warehouse greptime-log] map row to LogEntry error:
{}", e.getMessage());
+ }
+ }
+ return list;
+ }
+
+ private static Object parseJsonMaybe(Object value) {
+ if (value == null) return null;
+ if (value instanceof Map) return value;
+ if (value instanceof String str) {
+ String s = str.trim();
+ if ((s.startsWith("{") && s.endsWith("}")) || (s.startsWith("[")
&& s.endsWith("]"))) {
+ try {
+ return JsonUtil.fromJson(s, Object.class);
+ } catch (Exception e) {
+ return s;
+ }
+ }
+ return s;
+ }
+ return value;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String, Object> castToMap(Object obj) {
+ if (obj instanceof Map) {
+ return (Map<String, Object>) obj;
+ }
+ return null;
+ }
+
+ private static Long castToLong(Object obj) {
+ if (obj == null) return null;
+ if (obj instanceof Number n) return n.longValue();
+ try {
+ return Long.parseLong(String.valueOf(obj));
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static Integer castToInteger(Object obj) {
+ if (obj == null) return null;
+ if (obj instanceof Number n) return n.intValue();
+ try {
+ return Integer.parseInt(String.valueOf(obj));
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static String castToString(Object obj) {
+ return obj == null ? null : String.valueOf(obj);
+ }
+
@Override
public boolean batchDeleteLogs(List<Long> timeUnixNanos) {
if (!isServerAvailable() || timeUnixNanos == null ||
timeUnixNanos.isEmpty()) {
return false;
}
+
try {
StringBuilder sql = new StringBuilder("DELETE FROM
").append(LOG_TABLE_NAME).append(" WHERE time_unix_nano IN (");
- // Construct placeholders (?,?,?)
- String placeholders = timeUnixNanos.stream().map(t ->
"?").collect(Collectors.joining(", "));
- sql.append(placeholders).append(")");
+ sql.append(timeUnixNanos.stream()
+ .filter(time -> time != null)
+ .map(String::valueOf)
+ .collect(Collectors.joining(", ")));
+ sql.append(")");
- // Convert list to array for varargs
- greptimeSqlQueryExecutor.query(sql.toString(),
timeUnixNanos.toArray());
+ greptimeSqlQueryExecutor.execute(sql.toString());
log.info("[warehouse greptime-log] Batch delete executed
successfully for {} logs", timeUnixNanos.size());
return true;
+
} catch (Exception e) {
log.error("[warehouse greptime-log] batchDeleteLogs error: {}",
e.getMessage(), e);
return false;
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
index f356eea5c6..7913eec5fd 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeProperties.java
@@ -27,16 +27,11 @@ import
org.springframework.boot.context.properties.bind.DefaultValue;
* GrepTimeDB configuration information
*/
@ConfigurationProperties(prefix =
ConfigConstants.FunctionModuleConstants.WAREHOUSE
- + SignConstants.DOT
- + WarehouseConstants.STORE
- + SignConstants.DOT
- + WarehouseConstants.HistoryName.GREPTIME)
-public record GreptimeProperties(
- @DefaultValue("false") boolean enabled,
- @DefaultValue("127.0.0.1:4001") String grpcEndpoints,
- @DefaultValue("http://127.0.0.1:4000") String httpEndpoint,
- @DefaultValue("127.0.0.1:4003") String postgresEndpoint,
- @DefaultValue("public") String database,
- String username,
- String password) {
-}
\ No newline at end of file
+ + SignConstants.DOT
+ + WarehouseConstants.STORE
+ + SignConstants.DOT
+ + WarehouseConstants.HistoryName.GREPTIME)
+public record GreptimeProperties(@DefaultValue("false") boolean enabled,
+ @DefaultValue("127.0.0.1:4001") String grpcEndpoints,
@DefaultValue("http://127.0.0.1:4000") String httpEndpoint,
+ @DefaultValue("public") String database, String username, String
password) {
+}
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
index 9a518c8a54..99f83855ae 100644
---
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/db/GreptimeSqlQueryExecutorTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,23 +21,26 @@ package org.apache.hertzbeat.warehouse.db;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.hertzbeat.common.entity.log.LogEntry;
+import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
+import
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeSqlQueryContent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
/**
* Test case for {@link GreptimeSqlQueryExecutor}
@@ -46,23 +49,36 @@ import org.springframework.jdbc.core.RowMapper;
class GreptimeSqlQueryExecutorTest {
@Mock
- private JdbcTemplate jdbcTemplate;
+ private GreptimeProperties greptimeProperties;
+
+ @Mock
+ private RestTemplate restTemplate;
private GreptimeSqlQueryExecutor greptimeSqlQueryExecutor;
@BeforeEach
void setUp() {
- // Use the constructor capable of dependency injection for mocking
- greptimeSqlQueryExecutor = new GreptimeSqlQueryExecutor(jdbcTemplate);
+
when(greptimeProperties.httpEndpoint()).thenReturn("http://127.0.0.1:4000");
+ when(greptimeProperties.database()).thenReturn("hertzbeat");
+ when(greptimeProperties.username()).thenReturn("username");
+ when(greptimeProperties.password()).thenReturn("password");
+
+ greptimeSqlQueryExecutor = new
GreptimeSqlQueryExecutor(greptimeProperties, restTemplate);
}
@Test
void testExecuteSuccess() {
- // Mock successful response for queryForList
- List<Map<String, Object>> mockResult = new ArrayList<>();
- mockResult.add(Map.of("metric_name", "cpu", "value", 85.5));
-
-
when(jdbcTemplate.queryForList(any(String.class))).thenReturn(mockResult);
+ // Mock successful response
+ GreptimeSqlQueryContent mockResponse = createMockResponse();
+ ResponseEntity<GreptimeSqlQueryContent> responseEntity =
+ new ResponseEntity<>(mockResponse, HttpStatus.OK);
+
+ when(restTemplate.exchange(
+ any(String.class),
+ eq(HttpMethod.POST),
+ any(HttpEntity.class),
+ eq(GreptimeSqlQueryContent.class)
+ )).thenReturn(responseEntity);
// Execute
List<Map<String, Object>> result =
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics");
@@ -77,41 +93,48 @@ class GreptimeSqlQueryExecutorTest {
@Test
void testExecuteError() {
// Mock error response
- when(jdbcTemplate.queryForList(any(String.class))).thenThrow(new
RuntimeException("Connection error"));
+ when(restTemplate.exchange(
+ any(String.class),
+ eq(HttpMethod.POST),
+ any(HttpEntity.class),
+ eq(GreptimeSqlQueryContent.class)
+ )).thenThrow(new RuntimeException("Connection error"));
+
+ // Execute
+ List<Map<String, Object>> result =
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics");
- // Execute and verify exception
- assertThrows(RuntimeException.class, () ->
greptimeSqlQueryExecutor.execute("SELECT * FROM metrics"));
+ // Verify returns empty list on error
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
}
- @Test
- void testQuerySuccess() {
- // Mock success response for query (using RowMapper)
- List<LogEntry> mockLogs =
List.of(LogEntry.builder().traceId("123").build());
- when(jdbcTemplate.query(any(String.class), any(RowMapper.class),
any(Object[].class)))
- .thenReturn(mockLogs);
+ private GreptimeSqlQueryContent createMockResponse() {
+ GreptimeSqlQueryContent response = new GreptimeSqlQueryContent();
+ response.setCode(0);
- // Execute
- List<LogEntry> result = greptimeSqlQueryExecutor.query("SELECT * FROM
logs WHERE id = ?", 1);
+ // Create simple schema
+ List<GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema>
columnSchemas = new ArrayList<>();
+ columnSchemas.add(new
GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema("metric_name",
"String"));
+ columnSchemas.add(new
GreptimeSqlQueryContent.Output.Records.Schema.ColumnSchema("value", "Float64"));
- // Verify
- assertNotNull(result);
- assertEquals(1, result.size());
- assertEquals("123", result.get(0).getTraceId());
+ GreptimeSqlQueryContent.Output.Records.Schema schema =
+ new GreptimeSqlQueryContent.Output.Records.Schema();
+ schema.setColumnSchemas(columnSchemas);
- // Verify args passed
- verify(jdbcTemplate).query(eq("SELECT * FROM logs WHERE id = ?"),
any(RowMapper.class), eq(1));
- }
+ // Create simple row
+ List<List<Object>> rows = new ArrayList<>();
+ rows.add(List.of("cpu", 85.5));
- @Test
- void testCountSuccess() {
- // Mock success response for queryForObject (count)
- when(jdbcTemplate.queryForObject(any(String.class), eq(Long.class),
any(Object[].class)))
- .thenReturn(10L);
+ // Build response structure
+ GreptimeSqlQueryContent.Output.Records records =
+ new GreptimeSqlQueryContent.Output.Records();
+ records.setSchema(schema);
+ records.setRows(rows);
- // Execute
- Long count = greptimeSqlQueryExecutor.count("SELECT COUNT(*) FROM
logs");
+ GreptimeSqlQueryContent.Output output = new
GreptimeSqlQueryContent.Output();
+ output.setRecords(records);
- // Verify
- assertEquals(10L, count);
+ response.setOutput(List.of(output));
+ return response;
}
-}
\ No newline at end of file
+}
diff --git
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
index 4b4a825608..45a1234c11 100644
---
a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
+++
b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java
@@ -82,7 +82,7 @@ class GreptimeDbDataStorageTest {
@Mock
private GreptimeDB greptimeDb;
-
+
private GreptimeDbDataStorage greptimeDbDataStorage;
@BeforeEach
@@ -120,14 +120,14 @@ class GreptimeDbDataStorageTest {
void testSaveData() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
-
+
// Mock the write result
@SuppressWarnings("unchecked")
Result<WriteOk, Err> mockResult = mock(Result.class);
when(mockResult.isOk()).thenReturn(true);
CompletableFuture<Result<WriteOk, Err>> mockFuture =
CompletableFuture.completedFuture(mockResult);
when(greptimeDb.write(any(Table.class))).thenReturn(mockFuture);
-
+
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
// Test with valid metrics data
@@ -153,7 +153,7 @@ class GreptimeDbDataStorageTest {
@Test
void testGetHistoryMetricData() {
greptimeDbDataStorage = new GreptimeDbDataStorage(greptimeProperties,
restTemplate, greptimeSqlQueryExecutor);
-
+
PromQlQueryContent content = createMockPromQlQueryContent();
ResponseEntity<PromQlQueryContent> responseEntity = new
ResponseEntity<>(content, HttpStatus.OK);
@@ -172,14 +172,14 @@ class GreptimeDbDataStorageTest {
void testSaveLogData() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
-
+
// Mock the write result
@SuppressWarnings("unchecked")
Result<WriteOk, Err> mockResult = mock(Result.class);
when(mockResult.isOk()).thenReturn(true);
CompletableFuture<Result<WriteOk, Err>> mockFuture =
CompletableFuture.completedFuture(mockResult);
when(greptimeDb.write(any(Table.class))).thenReturn(mockFuture);
-
+
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
LogEntry logEntry = createMockLogEntry();
@@ -194,61 +194,51 @@ class GreptimeDbDataStorageTest {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
-
- // Mock list return for query
- List<LogEntry> mockLogs = List.of(createMockLogEntry());
-
- // The query passes 7 arguments: start, end, traceId, spanId,
severityNumber, severityText, content
- when(greptimeSqlQueryExecutor.query(anyString(), any(), any(),
any(), any(), any(), any(), any()))
- .thenReturn(mockLogs);
-
+ List<Map<String, Object>> mockLogRows = createMockLogRows();
+
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(mockLogRows);
+
// Test basic query
List<LogEntry> result =
greptimeDbDataStorage.queryLogsByMultipleConditions(
- System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), "trace123", "span456", 1, "INFO", "content"
+ System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), "trace123", "span456", 1, "INFO"
);
assertNotNull(result);
assertEquals(1, result.size());
assertEquals("trace123", result.get(0).getTraceId());
- // Mock count return
- // The count query passes 2 arguments: start, end (other fields
are null)
- when(greptimeSqlQueryExecutor.count(anyString(), any(),
any())).thenReturn(5L);
+ // Test count query
+ List<Map<String, Object>> mockCountResult =
List.of(Map.of("count", 5L));
+
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(mockCountResult);
long count = greptimeDbDataStorage.countLogsByMultipleConditions(
- System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), null, null, null, null, null
+ System.currentTimeMillis() - 3600000,
System.currentTimeMillis(), null, null, null, null
);
assertEquals(5L, count);
-
+
// Test count query with executor error
- when(greptimeSqlQueryExecutor.count(anyString(), any(),
any())).thenThrow(new RuntimeException("Database error"));
- long errorCount =
greptimeDbDataStorage.countLogsByMultipleConditions(System.currentTimeMillis()
- 3600000, System.currentTimeMillis(), null, null, null, null, null);
+ when(greptimeSqlQueryExecutor.execute(anyString())).thenThrow(new
RuntimeException("Database error"));
+ long errorCount =
greptimeDbDataStorage.countLogsByMultipleConditions(System.currentTimeMillis()
- 3600000, System.currentTimeMillis(), null, null, null, null);
assertEquals(0L, errorCount);
}
}
-
+
@Test
void testQueryLogsWithPagination() {
try (MockedStatic<GreptimeDB> mockedStatic =
mockStatic(GreptimeDB.class)) {
mockedStatic.when(() ->
GreptimeDB.create(any())).thenReturn(greptimeDb);
greptimeDbDataStorage = new
GreptimeDbDataStorage(greptimeProperties, restTemplate,
greptimeSqlQueryExecutor);
-
- List<LogEntry> mockLogs = List.of(createMockLogEntry());
- // The query passes 4 arguments: start, end, limit, offset
- when(greptimeSqlQueryExecutor.query(anyString(), any(), any(),
any(), any())).thenReturn(mockLogs);
-
+
when(greptimeSqlQueryExecutor.execute(anyString())).thenReturn(createMockLogRows());
+
ArgumentCaptor<String> sqlCaptor =
ArgumentCaptor.forClass(String.class);
greptimeDbDataStorage.queryLogsByMultipleConditionsWithPagination(
System.currentTimeMillis() - 3600000,
System.currentTimeMillis(),
- null, null, null, null, null, 1, 10
+ null, null, null, null, 1, 10
);
- // Verify that the query method was called with the SQL containing
LIMIT and OFFSET
- // And 4 parameters
- verify(greptimeSqlQueryExecutor).query(sqlCaptor.capture(), any(),
any(), any(), any());
+ verify(greptimeSqlQueryExecutor).execute(sqlCaptor.capture());
String capturedSql = sqlCaptor.getValue();
-
- assertTrue(capturedSql.toLowerCase().contains("limit ?"));
- assertTrue(capturedSql.toLowerCase().contains("offset ?"));
+
+ assertTrue(capturedSql.toLowerCase().contains("limit 10"));
+ assertTrue(capturedSql.toLowerCase().contains("offset 1"));
}
}
@@ -262,8 +252,7 @@ class GreptimeDbDataStorageTest {
// Test with valid list
boolean result = greptimeDbDataStorage.batchDeleteLogs(List.of(1L,
2L));
assertTrue(result);
- // Verify query method is called with 2 arguments (for 2 IDs)
- verify(greptimeSqlQueryExecutor, times(1)).query(anyString(),
eq(1L), eq(2L));
+ verify(greptimeSqlQueryExecutor, times(1)).execute(anyString());
}
}
@@ -276,8 +265,7 @@ class GreptimeDbDataStorageTest {
// Test with empty list
boolean emptyResult =
greptimeDbDataStorage.batchDeleteLogs(Collections.emptyList());
assertFalse(emptyResult);
- // Verify query was never called
- verify(greptimeSqlQueryExecutor, never()).query(anyString(),
any());
+ verify(greptimeSqlQueryExecutor, never()).execute(anyString());
}
}
@@ -290,8 +278,7 @@ class GreptimeDbDataStorageTest {
// Test with null list
boolean nullResult = greptimeDbDataStorage.batchDeleteLogs(null);
assertFalse(nullResult);
- // Verify query was never called
- verify(greptimeSqlQueryExecutor, never()).query(anyString(),
any());
+ verify(greptimeSqlQueryExecutor, never()).execute(anyString());
}
}
@@ -312,7 +299,7 @@ class GreptimeDbDataStorageTest {
lenient().when(mockMetricsData.getCode()).thenReturn(CollectRep.Code.SUCCESS);
lenient().when(mockMetricsData.getMetrics()).thenReturn("cpu");
lenient().when(mockMetricsData.getId()).thenReturn(1L);
-
+
if (!hasValues) {
lenient().when(mockMetricsData.getValues()).thenReturn(Collections.emptyList());
return mockMetricsData;
@@ -324,36 +311,36 @@ class GreptimeDbDataStorageTest {
lenient().when(mockField1.getName()).thenReturn("usage");
lenient().when(mockField1.getLabel()).thenReturn(false);
lenient().when(mockField1.getType()).thenReturn((int)
CommonConstants.TYPE_NUMBER);
-
+
CollectRep.Field mockField2 = mock(CollectRep.Field.class);
lenient().when(mockField2.getName()).thenReturn("instance");
lenient().when(mockField2.getLabel()).thenReturn(true);
lenient().when(mockField2.getType()).thenReturn((int)
CommonConstants.TYPE_STRING);
-
+
lenient().when(mockMetricsData.getFields()).thenReturn(List.of(mockField1,
mockField2));
-
+
// Create ValueRow mock
CollectRep.ValueRow mockValueRow = mock(CollectRep.ValueRow.class);
lenient().when(mockValueRow.getColumnsList()).thenReturn(List.of("server1",
"85.5"));
-
+
lenient().when(mockMetricsData.getValues()).thenReturn(List.of(mockValueRow));
// Mock RowWrapper for readRow()
RowWrapper mockRowWrapper = mock(RowWrapper.class);
lenient().when(mockRowWrapper.hasNextRow()).thenReturn(true, false);
lenient().when(mockRowWrapper.nextRow()).thenReturn(mockRowWrapper);
-
+
// Mock cell stream
ArrowCell mockCell1 = mock(ArrowCell.class);
lenient().when(mockCell1.getValue()).thenReturn("85.5");
lenient().when(mockCell1.getMetadataAsBoolean(any())).thenReturn(false);
lenient().when(mockCell1.getMetadataAsByte(any())).thenReturn(CommonConstants.TYPE_NUMBER);
-
+
ArrowCell mockCell2 = mock(ArrowCell.class);
lenient().when(mockCell2.getValue()).thenReturn("server1");
lenient().when(mockCell2.getMetadataAsBoolean(any())).thenReturn(true);
lenient().when(mockCell2.getMetadataAsByte(any())).thenReturn(CommonConstants.TYPE_STRING);
-
+
lenient().when(mockRowWrapper.cellStream()).thenReturn(java.util.stream.Stream.of(mockCell1,
mockCell2));
lenient().when(mockMetricsData.readRow()).thenReturn(mockRowWrapper);
@@ -389,4 +376,15 @@ class GreptimeDbDataStorageTest {
.spanId("span456")
.build();
}
+
+ private List<Map<String, Object>> createMockLogRows() {
+ Map<String, Object> row = new HashMap<>();
+ row.put("time_unix_nano", System.nanoTime());
+ row.put("severity_text", "INFO");
+ row.put("body", "\"Test log message\"");
+ row.put("trace_id", "trace123");
+ row.put("span_id", "span456");
+
+ return List.of(row);
+ }
}
\ No newline at end of file
diff --git a/script/application.yml b/script/application.yml
index 8d36940e2c..e141b4606c 100644
--- a/script/application.yml
+++ b/script/application.yml
@@ -209,7 +209,6 @@ warehouse:
enabled: false
grpc-endpoints: localhost:4001
http-endpoint: http://localhost:4000
- postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git
a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
index ea8347f19b..1bfe96d105 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
@@ -149,7 +149,6 @@ warehouse:
enabled: true
grpc-endpoints: greptime:4001
http-endpoint: http://greptime:4000
- postgres-endpoint: localhost:4003
# if you config other database name, you should create them first
database: public
username: greptime
diff --git a/web-app/src/app/routes/log/log-manage/log-manage.component.html
b/web-app/src/app/routes/log/log-manage/log-manage.component.html
index 3ee454cd9d..cb95696ac9 100644
--- a/web-app/src/app/routes/log/log-manage/log-manage.component.html
+++ b/web-app/src/app/routes/log/log-manage/log-manage.component.html
@@ -5,9 +5,9 @@
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,7 +23,9 @@
<nz-divider></nz-divider>
+<!-- Combined Filter and Statistics Card -->
<nz-card [nzTitle]="'log.manage.title' | i18n" class="manager-card">
+ <!-- Filters Section -->
<div class="filters-container">
<nz-space nzSize="middle" nzWrap>
<nz-range-picker *nzSpaceItem [(ngModel)]="timeRange" nzShowTime
nzFormat="yyyy-MM-dd HH:mm:ss"></nz-range-picker>
@@ -54,7 +56,6 @@
<nz-auto-option nzValue="ERROR">ERROR</nz-auto-option>
<nz-auto-option nzValue="FATAL">FATAL</nz-auto-option>
</nz-autocomplete>
- <input *nzSpaceItem nz-input [placeholder]="'log.manage.search' | i18n"
[(ngModel)]="searchContent" style="width: 200px" />
<button *nzSpaceItem nz-button nzType="primary" (click)="query()">
<i nz-icon nzType="search"></i> {{ 'log.manage.search' | i18n }}
</button>
@@ -100,7 +101,9 @@
</ng-template>
</div>
+ <!-- Statistics Section (Collapsible) -->
<div *ngIf="showStatistics">
+ <!-- Statistics Overview -->
<div nz-row [nzGutter]="16" style="margin-bottom: 16px">
<div nz-col nzXs="12" nzSm="4">
<nz-card>
@@ -176,6 +179,7 @@
</div>
</div>
+ <!-- Charts Section -->
<div nz-row [nzGutter]="16">
<div nz-col nzXs="24" nzMd="8">
<nz-card [nzTitle]="'log.manage.chart.severity-distribution' | i18n"
[nzSize]="'small'">
@@ -196,6 +200,7 @@
</div>
</nz-card>
+<!-- Log Table -->
<nz-table
#fixedTable
[nzData]="data"
@@ -314,6 +319,7 @@
</tbody>
</nz-table>
+<!-- Log Details Modal -->
<nz-modal
[(nzVisible)]="isModalVisible"
[nzTitle]="'log.manage.log-entry-details' | i18n"
@@ -323,6 +329,7 @@
>
<ng-container *nzModalContent>
<div *ngIf="selectedLogEntry" class="log-details-modal">
+ <!-- Basic Info Section -->
<nz-card [nzTitle]="'log.manage.basic-information' | i18n"
[nzSize]="'small'" class="basic-info-card">
<div class="basic-info">
<div class="info-row">
@@ -346,6 +353,7 @@
</div>
</nz-card>
+ <!-- JSON Details Section -->
<nz-card [nzTitle]="'log.manage.complete-json-data' | i18n"
[nzSize]="'small'">
<div class="json-content">
<button
diff --git a/web-app/src/app/routes/log/log-manage/log-manage.component.ts
b/web-app/src/app/routes/log/log-manage/log-manage.component.ts
index 496161769b..1d448931d1 100644
--- a/web-app/src/app/routes/log/log-manage/log-manage.component.ts
+++ b/web-app/src/app/routes/log/log-manage/log-manage.component.ts
@@ -92,7 +92,6 @@ export class LogManageComponent implements OnInit {
severityText?: string;
traceId: string = '';
spanId: string = '';
- searchContent: string = '';
// table with pagination
loading = false;
@@ -303,7 +302,6 @@ export class LogManageComponent implements OnInit {
this.spanId,
this.severityNumber,
this.severityText,
- this.searchContent,
this.pageIndex - 1,
this.pageSize
);
@@ -341,9 +339,8 @@ export class LogManageComponent implements OnInit {
const spanId = this.spanId || undefined;
const severity = this.severityNumber || undefined;
const severityText = this.severityText || undefined;
- const search = this.searchContent || undefined;
- this.logSvc.overviewStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
+ this.logSvc.overviewStats(start, end, traceId, spanId, severity,
severityText).subscribe({
next: message => {
if (message.code === 0) {
this.overviewStats = message.data || {};
@@ -352,7 +349,7 @@ export class LogManageComponent implements OnInit {
}
});
- this.logSvc.traceCoverageStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
+ this.logSvc.traceCoverageStats(start, end, traceId, spanId, severity,
severityText).subscribe({
next: message => {
if (message.code === 0) {
this.refreshTraceCoverageChart(message.data || {});
@@ -360,7 +357,7 @@ export class LogManageComponent implements OnInit {
}
});
- this.logSvc.trendStats(start, end, traceId, spanId, severity,
severityText, search).subscribe({
+ this.logSvc.trendStats(start, end, traceId, spanId, severity,
severityText).subscribe({
next: message => {
if (message.code === 0) {
this.refreshTrendChart(message.data?.hourlyStats || {});
@@ -375,7 +372,6 @@ export class LogManageComponent implements OnInit {
this.traceId = '';
this.spanId = '';
this.severityText = '';
- this.searchContent = '';
this.pageIndex = 1;
this.query();
}
diff --git a/web-app/src/app/service/log.service.ts
b/web-app/src/app/service/log.service.ts
index 0f5b70ea7d..2a4a0a19aa 100644
--- a/web-app/src/app/service/log.service.ts
+++ b/web-app/src/app/service/log.service.ts
@@ -43,7 +43,6 @@ export class LogService {
spanId?: string,
severityNumber?: number,
severityText?: string,
- search?: string,
pageIndex: number = 0,
pageSize: number = 20
): Observable<Message<Page<LogEntry>>> {
@@ -54,7 +53,6 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
- if (search) params = params.set('search', search);
params = params.set('pageIndex', pageIndex);
params = params.set('pageSize', pageSize);
return this.http.get<Message<any>>(logs_list_uri, { params });
@@ -66,8 +64,7 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string,
- search?: string
+ severityText?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -76,7 +73,6 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
- if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_overview_uri, { params });
}
@@ -86,8 +82,7 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string,
- search?: string
+ severityText?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -96,7 +91,6 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
- if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_trend_uri, { params });
}
@@ -106,8 +100,7 @@ export class LogService {
traceId?: string,
spanId?: string,
severityNumber?: number,
- severityText?: string,
- search?: string
+ severityText?: string
): Observable<Message<any>> {
let params = new HttpParams();
if (start != null) params = params.set('start', start);
@@ -116,7 +109,6 @@ export class LogService {
if (spanId) params = params.set('spanId', spanId);
if (severityNumber != null) params = params.set('severityNumber',
severityNumber);
if (severityText) params = params.set('severityText', severityText);
- if (search) params = params.set('search', search);
return this.http.get<Message<any>>(logs_stats_trace_coverage_uri, { params
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]