This is an automated email from the ASF dual-hosted git repository.
zqr10159 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/2.0.0 by this push:
new 6e3bc0e1f3 Preserve log stream attribute filters
6e3bc0e1f3 is described below
commit 6e3bc0e1f38a8d69f020e567597b89aada0a80d2
Author: Logic <[email protected]>
AuthorDate: Wed Jun 10 08:34:47 2026 +0800
Preserve log stream attribute filters
---
.../logs/sse/LogSseFilterCriteria.java | 293 +++++++++++++++++++++
.../logs/controller/LogSseControllerTest.java | 6 +
.../logs/sse/LogSseFilterCriteriaTest.java | 52 ++++
web-next/lib/log-manage/query-state.test.ts | 13 +-
web-next/lib/log-manage/query-state.ts | 3 +
5 files changed, 365 insertions(+), 2 deletions(-)
diff --git
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
index 20f245ecca..844aa9c684 100644
---
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
+++
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
@@ -19,8 +19,15 @@
package org.apache.hertzbeat.observability.logs.sse;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@@ -40,6 +47,24 @@ import static
io.swagger.v3.oas.annotations.media.Schema.AccessMode.READ_ONLY;
@Schema(description = "Log filtering criteria for SSE (Server-Sent Events) log
streaming")
public class LogSseFilterCriteria {
+ private static final String LOG_FILTER_NEGATION_PREFIX = "!";
+ private static final String LOG_FILTER_IN_PREFIX = "__in__:";
+ private static final String LOG_FILTER_NOT_IN_PREFIX = "__not_in__:";
+ private static final String LOG_FILTER_CONTAINS_PREFIX = "__contains__:";
+ private static final String LOG_FILTER_NOT_CONTAINS_PREFIX =
"__not_contains__:";
+ private static final String LOG_FILTER_EXISTS_PREFIX = "__exists__";
+ private static final String LOG_FILTER_NOT_EXISTS_PREFIX =
"__not_exists__";
+ private static final String LOG_FILTER_VALUE_DELIMITER = "\u001F";
+ private static final Pattern LOG_FILTER_LIST_OPERATOR_PATTERN =
Pattern.compile(
+ "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+IN|IN)\\s*(\\(.+\\))$",
+ Pattern.CASE_INSENSITIVE);
+ private static final Pattern LOG_FILTER_TEXT_OPERATOR_PATTERN =
Pattern.compile(
+ "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+CONTAINS|CONTAINS)\\s+(.+)$",
+ Pattern.CASE_INSENSITIVE);
+ private static final Pattern LOG_FILTER_PRESENCE_OPERATOR_PATTERN =
Pattern.compile(
+ "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+EXISTS|EXISTS)$",
+ Pattern.CASE_INSENSITIVE);
+
private static final Set<String> WORKSPACE_RESOURCE_KEYS = Set.of(
"hertzbeat.workspace_id",
AuthTokenScopes.CLAIM_WORKSPACE_ID,
@@ -97,6 +122,12 @@ public class LogSseFilterCriteria {
@Schema(description = "HertzBeat entity type resource attribute.", example
= "service", accessMode = READ_WRITE)
private String entityType;
+ @Schema(description = "Resource attribute filter expression, for example
service.version=1.2.3", accessMode = READ_WRITE)
+ private String resourceFilter;
+
+ @Schema(description = "Log attribute filter expression, for example
http.route:/checkout", accessMode = READ_WRITE)
+ private String attributeFilter;
+
/**
* Workspace boundary captured from the authenticated request.
*/
@@ -157,6 +188,12 @@ public class LogSseFilterCriteria {
if (!matchesServiceContext(log)) {
return false;
}
+ if (!matchesAttributes(log.getResource(),
parseLogAttributeFilter(resourceFilter))) {
+ return false;
+ }
+ if (!matchesAttributes(log.getAttributes(),
parseLogAttributeFilter(attributeFilter))) {
+ return false;
+ }
return true;
}
@@ -212,6 +249,262 @@ public class LogSseFilterCriteria {
return null;
}
+ private Map<String, String> parseLogAttributeFilter(String
filterExpression) {
+ if (!StringUtils.hasText(filterExpression)) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> filters = new LinkedHashMap<>();
+ for (String token : splitLogFilterClauses(filterExpression)) {
+ if (!StringUtils.hasText(token)) {
+ continue;
+ }
+ if (appendLogFilterListValues(filters, token)
+ || appendLogFilterTextValue(filters, token)
+ || appendLogFilterPresenceValue(filters, token)) {
+ continue;
+ }
+ boolean negate = false;
+ int separatorIndex = token.indexOf("!=");
+ if (separatorIndex >= 0) {
+ negate = true;
+ } else {
+ separatorIndex = token.indexOf('=');
+ }
+ if (separatorIndex < 0) {
+ separatorIndex = token.indexOf(':');
+ }
+ if (separatorIndex <= 0 || separatorIndex >= token.length() - 1) {
+ continue;
+ }
+ String key = token.substring(0, separatorIndex).trim();
+ String value = stripFilterQuotes(token.substring(separatorIndex +
(negate ? 2 : 1)).trim());
+ if (!isSafeAttributeKey(key) || !StringUtils.hasText(value)) {
+ continue;
+ }
+ filters.put(key, negate ? LOG_FILTER_NEGATION_PREFIX + value :
value);
+ }
+ return filters.isEmpty() ? Collections.emptyMap() :
Map.copyOf(filters);
+ }
+
+ private boolean appendLogFilterListValues(Map<String, String> filters,
String token) {
+ Matcher matcher = LOG_FILTER_LIST_OPERATOR_PATTERN.matcher(token);
+ if (!matcher.matches()) {
+ return false;
+ }
+ String key = matcher.group(1).trim();
+ String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+ String valueList = matcher.group(3).trim();
+ if (!isSafeAttributeKey(key) || valueList.length() < 2
+ || !valueList.startsWith("(") || !valueList.endsWith(")")) {
+ return false;
+ }
+ List<String> values = splitLogFilterListValues(valueList.substring(1,
valueList.length() - 1)).stream()
+ .map(value -> stripFilterQuotes(value.trim()))
+ .filter(StringUtils::hasText)
+ .distinct()
+ .toList();
+ if (values.isEmpty()) {
+ return false;
+ }
+ String prefix = "not in".equalsIgnoreCase(operator) ?
LOG_FILTER_NOT_IN_PREFIX : LOG_FILTER_IN_PREFIX;
+ filters.put(key, prefix + String.join(LOG_FILTER_VALUE_DELIMITER,
values));
+ return true;
+ }
+
+ private boolean appendLogFilterTextValue(Map<String, String> filters,
String token) {
+ Matcher matcher = LOG_FILTER_TEXT_OPERATOR_PATTERN.matcher(token);
+ if (!matcher.matches()) {
+ return false;
+ }
+ String key = matcher.group(1).trim();
+ String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+ String value = stripFilterQuotes(matcher.group(3).trim());
+ if (!isSafeAttributeKey(key) || !StringUtils.hasText(value)) {
+ return false;
+ }
+ filters.put(key, "not contains".equalsIgnoreCase(operator)
+ ? LOG_FILTER_NOT_CONTAINS_PREFIX + value
+ : LOG_FILTER_CONTAINS_PREFIX + value);
+ return true;
+ }
+
+ private boolean appendLogFilterPresenceValue(Map<String, String> filters,
String token) {
+ Matcher matcher = LOG_FILTER_PRESENCE_OPERATOR_PATTERN.matcher(token);
+ if (!matcher.matches()) {
+ return false;
+ }
+ String key = matcher.group(1).trim();
+ String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+ if (!isSafeAttributeKey(key)) {
+ return false;
+ }
+ filters.put(key, "not exists".equalsIgnoreCase(operator)
+ ? LOG_FILTER_NOT_EXISTS_PREFIX
+ : LOG_FILTER_EXISTS_PREFIX);
+ return true;
+ }
+
+ private List<String> splitLogFilterClauses(String filterExpression) {
+ List<String> clauses = new ArrayList<>();
+ StringBuilder current = new StringBuilder();
+ int depth = 0;
+ char quote = 0;
+ for (int index = 0; index < filterExpression.length(); index++) {
+ char character = filterExpression.charAt(index);
+ if (quote != 0) {
+ current.append(character);
+ if (character == quote) {
+ quote = 0;
+ }
+ continue;
+ }
+ if (character == '\'' || character == '"') {
+ quote = character;
+ current.append(character);
+ continue;
+ }
+ if (character == '(') {
+ depth++;
+ current.append(character);
+ continue;
+ }
+ if (character == ')') {
+ depth = Math.max(0, depth - 1);
+ current.append(character);
+ continue;
+ }
+ if (depth == 0 && (character == ',' ||
isLogFilterAndDelimiter(filterExpression, index))) {
+ addLogFilterClause(clauses, current);
+ if (character != ',') {
+ index += 4;
+ }
+ continue;
+ }
+ current.append(character);
+ }
+ addLogFilterClause(clauses, current);
+ return clauses;
+ }
+
+ private List<String> splitLogFilterListValues(String values) {
+ List<String> result = new ArrayList<>();
+ StringBuilder current = new StringBuilder();
+ char quote = 0;
+ for (int index = 0; index < values.length(); index++) {
+ char character = values.charAt(index);
+ if (quote != 0) {
+ current.append(character);
+ if (character == quote) {
+ quote = 0;
+ }
+ continue;
+ }
+ if (character == '\'' || character == '"') {
+ quote = character;
+ current.append(character);
+ continue;
+ }
+ if (character == ',') {
+ addLogFilterClause(result, current);
+ continue;
+ }
+ current.append(character);
+ }
+ addLogFilterClause(result, current);
+ return result;
+ }
+
+ private void addLogFilterClause(List<String> clauses, StringBuilder
current) {
+ String clause = current.toString().trim();
+ if (StringUtils.hasText(clause)) {
+ clauses.add(clause);
+ }
+ current.setLength(0);
+ }
+
+ private boolean isLogFilterAndDelimiter(String value, int index) {
+ return index + 5 <= value.length() && value.regionMatches(true, index,
" and ", 0, 5);
+ }
+
+ private String stripFilterQuotes(String value) {
+ if (value.length() < 2) {
+ return value;
+ }
+ char first = value.charAt(0);
+ char last = value.charAt(value.length() - 1);
+ if ((first == '\'' && last == '\'') || (first == '"' && last == '"')) {
+ return value.substring(1, value.length() - 1).trim();
+ }
+ return value;
+ }
+
+ private boolean isSafeAttributeKey(String key) {
+ return StringUtils.hasText(key) && key.matches("[A-Za-z0-9_.:-]+");
+ }
+
+ private boolean matchesAttributes(Map<String, Object> source, Map<String,
String> expectedAttributes) {
+ if (expectedAttributes == null || expectedAttributes.isEmpty()) {
+ return true;
+ }
+ if (source == null || source.isEmpty()) {
+ return
expectedAttributes.values().stream().allMatch(this::isExclusionLogAttributeFilter);
+ }
+ return expectedAttributes.entrySet().stream()
+ .allMatch(entry -> matchesAttributeFilter(resolveValue(source,
entry.getKey()), entry.getValue(),
+ source.containsKey(entry.getKey())));
+ }
+
+ private boolean matchesAttributeFilter(String actualValue, String
expectedValue, boolean keyExists) {
+ if (LOG_FILTER_EXISTS_PREFIX.equals(expectedValue)) {
+ return keyExists;
+ }
+ if (LOG_FILTER_NOT_EXISTS_PREFIX.equals(expectedValue)) {
+ return !keyExists;
+ }
+ if (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_IN_PREFIX)) {
+ return
splitListLogAttributeValues(expectedValue.substring(LOG_FILTER_IN_PREFIX.length())).stream()
+ .anyMatch(expected -> matchesOptionalValue(actualValue,
expected));
+ }
+ if (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NOT_IN_PREFIX)) {
+ return
splitListLogAttributeValues(expectedValue.substring(LOG_FILTER_NOT_IN_PREFIX.length())).stream()
+ .noneMatch(expected -> matchesOptionalValue(actualValue,
expected));
+ }
+ if (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_CONTAINS_PREFIX)) {
+ return matchesContainedValue(actualValue,
expectedValue.substring(LOG_FILTER_CONTAINS_PREFIX.length()));
+ }
+ if (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NOT_CONTAINS_PREFIX)) {
+ return !matchesContainedValue(actualValue,
expectedValue.substring(LOG_FILTER_NOT_CONTAINS_PREFIX.length()));
+ }
+ if (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NEGATION_PREFIX)) {
+ return !matchesOptionalValue(actualValue,
expectedValue.substring(LOG_FILTER_NEGATION_PREFIX.length()));
+ }
+ return matchesOptionalValue(actualValue, expectedValue);
+ }
+
+ private boolean isExclusionLogAttributeFilter(String expectedValue) {
+ return (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NEGATION_PREFIX))
+ || (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NOT_IN_PREFIX))
+ || (expectedValue != null &&
expectedValue.startsWith(LOG_FILTER_NOT_CONTAINS_PREFIX))
+ || LOG_FILTER_NOT_EXISTS_PREFIX.equals(expectedValue);
+ }
+
+ private List<String> splitListLogAttributeValues(String encodedValues) {
+ if (!StringUtils.hasText(encodedValues)) {
+ return List.of();
+ }
+ return
List.of(encodedValues.split(Pattern.quote(LOG_FILTER_VALUE_DELIMITER),
-1)).stream()
+ .filter(StringUtils::hasText)
+ .toList();
+ }
+
+ private boolean matchesContainedValue(String actualValue, String
expectedValue) {
+ if (!StringUtils.hasText(expectedValue)) {
+ return true;
+ }
+ return StringUtils.hasText(actualValue)
+ &&
actualValue.toLowerCase(Locale.ROOT).contains(expectedValue.trim().toLowerCase(Locale.ROOT));
+ }
+
private String resolveWorkspaceId(Map<String, Object> resource) {
if (resource == null || resource.isEmpty()) {
return null;
diff --git
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
index d029918d38..cee52d2428 100644
---
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
+++
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
@@ -103,6 +103,8 @@ class LogSseControllerTest {
String spanId = "abcdef1234567890";
String entityId = "42";
String entityType = "service";
+ String resourceFilter = "service.version=1.2.3";
+ String attributeFilter = "http.route:/checkout";
// When: A request is made with all filter parameters
mockMvc.perform(get("/api/logs/sse/subscribe")
@@ -112,6 +114,8 @@ class LogSseControllerTest {
.param("spanId", spanId)
.param("entityId", entityId)
.param("entityType", entityType)
+ .param("resourceFilter", resourceFilter)
+ .param("attributeFilter", attributeFilter)
.accept(MediaType.TEXT_EVENT_STREAM_VALUE))
.andExpect(status().isOk());
@@ -125,6 +129,8 @@ class LogSseControllerTest {
Assertions.assertEquals(capturedCriteria.getSpanId(), spanId);
Assertions.assertEquals(capturedCriteria.getEntityId(), entityId);
Assertions.assertEquals(capturedCriteria.getEntityType(), entityType);
+ Assertions.assertEquals(capturedCriteria.getResourceFilter(),
resourceFilter);
+ Assertions.assertEquals(capturedCriteria.getAttributeFilter(),
attributeFilter);
}
@Test
diff --git
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
index e902fcb130..cace7a91c0 100644
---
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
+++
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
@@ -190,6 +190,58 @@ class LogSseFilterCriteriaTest {
assertFalse(filterCriteria.matches(paymentStagingLog));
}
+ @Test
+ void testMatchesWithResourceAndAttributeFilters() {
+ LogEntry checkoutLog = LogEntry.builder()
+ .severityText("INFO")
+ .body("checkout log")
+ .resource(java.util.Map.of(
+ "service.version", "1.2.3",
+ "cloud.region", "us-east-1"))
+ .attributes(java.util.Map.of(
+ "http.route", "/checkout",
+ "error.message", "payment timeout"))
+ .build();
+ LogEntry cartLog = LogEntry.builder()
+ .severityText("INFO")
+ .body("cart log")
+ .resource(java.util.Map.of(
+ "service.version", "1.2.4",
+ "cloud.region", "eu-west-1"))
+ .attributes(java.util.Map.of(
+ "http.route", "/cart",
+ "error.message", "cart timeout"))
+ .build();
+
+ filterCriteria.setResourceFilter("service.version=1.2.3, cloud.region
IN ('us-east-1')");
+ filterCriteria.setAttributeFilter("http.route:/checkout and
error.message CONTAINS payment");
+
+ assertTrue(filterCriteria.matches(checkoutLog));
+ assertFalse(filterCriteria.matches(cartLog));
+ }
+
+ @Test
+ void testMatchesWithNegativeResourceAndAttributeFilters() {
+ LogEntry checkoutLog = LogEntry.builder()
+ .severityText("INFO")
+ .body("checkout log")
+ .resource(java.util.Map.of("service.version", "1.2.3"))
+ .attributes(java.util.Map.of("http.route", "/checkout"))
+ .build();
+ LogEntry cartLog = LogEntry.builder()
+ .severityText("INFO")
+ .body("cart log")
+ .resource(java.util.Map.of("service.version", "1.2.4"))
+ .attributes(java.util.Map.of("http.route", "/cart"))
+ .build();
+
+ filterCriteria.setResourceFilter("service.version!=1.2.4");
+ filterCriteria.setAttributeFilter("http.route NOT IN ('/cart') and
error.message NOT EXISTS");
+
+ assertTrue(filterCriteria.matches(checkoutLog));
+ assertFalse(filterCriteria.matches(cartLog));
+ }
+
@Test
void testMatchesWithEmptyStringFilters() {
// Test empty string filters
diff --git a/web-next/lib/log-manage/query-state.test.ts
b/web-next/lib/log-manage/query-state.test.ts
index 7476664fa5..2a9dfaca80 100644
--- a/web-next/lib/log-manage/query-state.test.ts
+++ b/web-next/lib/log-manage/query-state.test.ts
@@ -314,7 +314,16 @@ describe('log query state codec', () => {
);
expect(
buildLogStreamUrl(
- { search: 'timeout', logContent: '', traceId: '', spanId: '',
severityNumber: '', severityText: 'ERROR' },
+ {
+ search: 'timeout',
+ logContent: '',
+ traceId: '',
+ spanId: '',
+ severityNumber: '',
+ severityText: 'ERROR',
+ resourceFilter: 'service.version=1.2.3',
+ attributeFilter: 'http.route:/checkout'
+ },
{
entityId: '42',
entityType: 'service',
@@ -324,7 +333,7 @@ describe('log query state codec', () => {
}
)
).toBe(
-
'/api/logs/sse/subscribe?logContent=timeout&severityText=ERROR&entityId=42&entityType=service&serviceName=checkout&serviceNamespace=payments&environment=prod'
+
'/api/logs/sse/subscribe?logContent=timeout&severityText=ERROR&resourceFilter=service.version%3D1.2.3&attributeFilter=http.route%3A%2Fcheckout&entityId=42&entityType=service&serviceName=checkout&serviceNamespace=payments&environment=prod'
);
});
diff --git a/web-next/lib/log-manage/query-state.ts
b/web-next/lib/log-manage/query-state.ts
index f2bbf60ac7..61fae3fbce 100644
--- a/web-next/lib/log-manage/query-state.ts
+++ b/web-next/lib/log-manage/query-state.ts
@@ -443,6 +443,9 @@ export function buildLogStreamUrl(query: LogQueryState,
routeContext: SignalRout
const severityNumber = normalizeSeverityNumberParam(query.severityNumber);
if (severityNumber) params.set('severityNumber', severityNumber);
if (query.severityText.trim()) params.set('severityText',
query.severityText.trim());
+ const resourceFilter = query.resourceFilter?.trim() || '';
+ if (resourceFilter) params.set('resourceFilter', resourceFilter);
+ if (query.attributeFilter?.trim()) params.set('attributeFilter',
query.attributeFilter.trim());
appendLogEntityContextParams(params, routeContext);
appendLogQuickFilterContext(params, routeContext);
const qs = params.toString();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]