This is an automated email from the ASF dual-hosted git repository.

zqr10159 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/2.0.0 by this push:
     new 6e3bc0e1f3 Preserve log stream attribute filters
6e3bc0e1f3 is described below

commit 6e3bc0e1f38a8d69f020e567597b89aada0a80d2
Author: Logic <[email protected]>
AuthorDate: Wed Jun 10 08:34:47 2026 +0800

    Preserve log stream attribute filters
---
 .../logs/sse/LogSseFilterCriteria.java             | 293 +++++++++++++++++++++
 .../logs/controller/LogSseControllerTest.java      |   6 +
 .../logs/sse/LogSseFilterCriteriaTest.java         |  52 ++++
 web-next/lib/log-manage/query-state.test.ts        |  13 +-
 web-next/lib/log-manage/query-state.ts             |   3 +
 5 files changed, 365 insertions(+), 2 deletions(-)

diff --git 
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
 
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
index 20f245ecca..844aa9c684 100644
--- 
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
+++ 
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteria.java
@@ -19,8 +19,15 @@
 
 package org.apache.hertzbeat.observability.logs.sse;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import lombok.Data;
 import lombok.AllArgsConstructor;
 import lombok.NoArgsConstructor;
@@ -40,6 +47,24 @@ import static 
io.swagger.v3.oas.annotations.media.Schema.AccessMode.READ_ONLY;
 @Schema(description = "Log filtering criteria for SSE (Server-Sent Events) log 
streaming")
 public class LogSseFilterCriteria {
 
+    private static final String LOG_FILTER_NEGATION_PREFIX = "!";
+    private static final String LOG_FILTER_IN_PREFIX = "__in__:";
+    private static final String LOG_FILTER_NOT_IN_PREFIX = "__not_in__:";
+    private static final String LOG_FILTER_CONTAINS_PREFIX = "__contains__:";
+    private static final String LOG_FILTER_NOT_CONTAINS_PREFIX = 
"__not_contains__:";
+    private static final String LOG_FILTER_EXISTS_PREFIX = "__exists__";
+    private static final String LOG_FILTER_NOT_EXISTS_PREFIX = 
"__not_exists__";
+    private static final String LOG_FILTER_VALUE_DELIMITER = "\u001F";
+    private static final Pattern LOG_FILTER_LIST_OPERATOR_PATTERN = 
Pattern.compile(
+            "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+IN|IN)\\s*(\\(.+\\))$",
+            Pattern.CASE_INSENSITIVE);
+    private static final Pattern LOG_FILTER_TEXT_OPERATOR_PATTERN = 
Pattern.compile(
+            "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+CONTAINS|CONTAINS)\\s+(.+)$",
+            Pattern.CASE_INSENSITIVE);
+    private static final Pattern LOG_FILTER_PRESENCE_OPERATOR_PATTERN = 
Pattern.compile(
+            "^([A-Za-z0-9_.:-]+)\\s+(NOT\\s+EXISTS|EXISTS)$",
+            Pattern.CASE_INSENSITIVE);
+
     private static final Set<String> WORKSPACE_RESOURCE_KEYS = Set.of(
             "hertzbeat.workspace_id",
             AuthTokenScopes.CLAIM_WORKSPACE_ID,
@@ -97,6 +122,12 @@ public class LogSseFilterCriteria {
     @Schema(description = "HertzBeat entity type resource attribute.", example 
= "service", accessMode = READ_WRITE)
     private String entityType;
 
+    @Schema(description = "Resource attribute filter expression, for example 
service.version=1.2.3", accessMode = READ_WRITE)
+    private String resourceFilter;
+
+    @Schema(description = "Log attribute filter expression, for example 
http.route:/checkout", accessMode = READ_WRITE)
+    private String attributeFilter;
+
     /**
      * Workspace boundary captured from the authenticated request.
      */
@@ -157,6 +188,12 @@ public class LogSseFilterCriteria {
         if (!matchesServiceContext(log)) {
             return false;
         }
+        if (!matchesAttributes(log.getResource(), 
parseLogAttributeFilter(resourceFilter))) {
+            return false;
+        }
+        if (!matchesAttributes(log.getAttributes(), 
parseLogAttributeFilter(attributeFilter))) {
+            return false;
+        }
         return true;
     }
 
@@ -212,6 +249,262 @@ public class LogSseFilterCriteria {
         return null;
     }
 
+    private Map<String, String> parseLogAttributeFilter(String 
filterExpression) {
+        if (!StringUtils.hasText(filterExpression)) {
+            return Collections.emptyMap();
+        }
+        Map<String, String> filters = new LinkedHashMap<>();
+        for (String token : splitLogFilterClauses(filterExpression)) {
+            if (!StringUtils.hasText(token)) {
+                continue;
+            }
+            if (appendLogFilterListValues(filters, token)
+                    || appendLogFilterTextValue(filters, token)
+                    || appendLogFilterPresenceValue(filters, token)) {
+                continue;
+            }
+            boolean negate = false;
+            int separatorIndex = token.indexOf("!=");
+            if (separatorIndex >= 0) {
+                negate = true;
+            } else {
+                separatorIndex = token.indexOf('=');
+            }
+            if (separatorIndex < 0) {
+                separatorIndex = token.indexOf(':');
+            }
+            if (separatorIndex <= 0 || separatorIndex >= token.length() - 1) {
+                continue;
+            }
+            String key = token.substring(0, separatorIndex).trim();
+            String value = stripFilterQuotes(token.substring(separatorIndex + 
(negate ? 2 : 1)).trim());
+            if (!isSafeAttributeKey(key) || !StringUtils.hasText(value)) {
+                continue;
+            }
+            filters.put(key, negate ? LOG_FILTER_NEGATION_PREFIX + value : 
value);
+        }
+        return filters.isEmpty() ? Collections.emptyMap() : 
Map.copyOf(filters);
+    }
+
+    private boolean appendLogFilterListValues(Map<String, String> filters, 
String token) {
+        Matcher matcher = LOG_FILTER_LIST_OPERATOR_PATTERN.matcher(token);
+        if (!matcher.matches()) {
+            return false;
+        }
+        String key = matcher.group(1).trim();
+        String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+        String valueList = matcher.group(3).trim();
+        if (!isSafeAttributeKey(key) || valueList.length() < 2
+                || !valueList.startsWith("(") || !valueList.endsWith(")")) {
+            return false;
+        }
+        List<String> values = splitLogFilterListValues(valueList.substring(1, 
valueList.length() - 1)).stream()
+                .map(value -> stripFilterQuotes(value.trim()))
+                .filter(StringUtils::hasText)
+                .distinct()
+                .toList();
+        if (values.isEmpty()) {
+            return false;
+        }
+        String prefix = "not in".equalsIgnoreCase(operator) ? 
LOG_FILTER_NOT_IN_PREFIX : LOG_FILTER_IN_PREFIX;
+        filters.put(key, prefix + String.join(LOG_FILTER_VALUE_DELIMITER, 
values));
+        return true;
+    }
+
+    private boolean appendLogFilterTextValue(Map<String, String> filters, 
String token) {
+        Matcher matcher = LOG_FILTER_TEXT_OPERATOR_PATTERN.matcher(token);
+        if (!matcher.matches()) {
+            return false;
+        }
+        String key = matcher.group(1).trim();
+        String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+        String value = stripFilterQuotes(matcher.group(3).trim());
+        if (!isSafeAttributeKey(key) || !StringUtils.hasText(value)) {
+            return false;
+        }
+        filters.put(key, "not contains".equalsIgnoreCase(operator)
+                ? LOG_FILTER_NOT_CONTAINS_PREFIX + value
+                : LOG_FILTER_CONTAINS_PREFIX + value);
+        return true;
+    }
+
+    private boolean appendLogFilterPresenceValue(Map<String, String> filters, 
String token) {
+        Matcher matcher = LOG_FILTER_PRESENCE_OPERATOR_PATTERN.matcher(token);
+        if (!matcher.matches()) {
+            return false;
+        }
+        String key = matcher.group(1).trim();
+        String operator = matcher.group(2).trim().replaceAll("\\s+", " ");
+        if (!isSafeAttributeKey(key)) {
+            return false;
+        }
+        filters.put(key, "not exists".equalsIgnoreCase(operator)
+                ? LOG_FILTER_NOT_EXISTS_PREFIX
+                : LOG_FILTER_EXISTS_PREFIX);
+        return true;
+    }
+
+    private List<String> splitLogFilterClauses(String filterExpression) {
+        List<String> clauses = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        int depth = 0;
+        char quote = 0;
+        for (int index = 0; index < filterExpression.length(); index++) {
+            char character = filterExpression.charAt(index);
+            if (quote != 0) {
+                current.append(character);
+                if (character == quote) {
+                    quote = 0;
+                }
+                continue;
+            }
+            if (character == '\'' || character == '"') {
+                quote = character;
+                current.append(character);
+                continue;
+            }
+            if (character == '(') {
+                depth++;
+                current.append(character);
+                continue;
+            }
+            if (character == ')') {
+                depth = Math.max(0, depth - 1);
+                current.append(character);
+                continue;
+            }
+            if (depth == 0 && (character == ',' || 
isLogFilterAndDelimiter(filterExpression, index))) {
+                addLogFilterClause(clauses, current);
+                if (character != ',') {
+                    index += 4;
+                }
+                continue;
+            }
+            current.append(character);
+        }
+        addLogFilterClause(clauses, current);
+        return clauses;
+    }
+
+    private List<String> splitLogFilterListValues(String values) {
+        List<String> result = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        char quote = 0;
+        for (int index = 0; index < values.length(); index++) {
+            char character = values.charAt(index);
+            if (quote != 0) {
+                current.append(character);
+                if (character == quote) {
+                    quote = 0;
+                }
+                continue;
+            }
+            if (character == '\'' || character == '"') {
+                quote = character;
+                current.append(character);
+                continue;
+            }
+            if (character == ',') {
+                addLogFilterClause(result, current);
+                continue;
+            }
+            current.append(character);
+        }
+        addLogFilterClause(result, current);
+        return result;
+    }
+
+    private void addLogFilterClause(List<String> clauses, StringBuilder 
current) {
+        String clause = current.toString().trim();
+        if (StringUtils.hasText(clause)) {
+            clauses.add(clause);
+        }
+        current.setLength(0);
+    }
+
+    private boolean isLogFilterAndDelimiter(String value, int index) {
+        return index + 5 <= value.length() && value.regionMatches(true, index, 
" and ", 0, 5);
+    }
+
+    private String stripFilterQuotes(String value) {
+        if (value.length() < 2) {
+            return value;
+        }
+        char first = value.charAt(0);
+        char last = value.charAt(value.length() - 1);
+        if ((first == '\'' && last == '\'') || (first == '"' && last == '"')) {
+            return value.substring(1, value.length() - 1).trim();
+        }
+        return value;
+    }
+
+    private boolean isSafeAttributeKey(String key) {
+        return StringUtils.hasText(key) && key.matches("[A-Za-z0-9_.:-]+");
+    }
+
+    private boolean matchesAttributes(Map<String, Object> source, Map<String, 
String> expectedAttributes) {
+        if (expectedAttributes == null || expectedAttributes.isEmpty()) {
+            return true;
+        }
+        if (source == null || source.isEmpty()) {
+            return 
expectedAttributes.values().stream().allMatch(this::isExclusionLogAttributeFilter);
+        }
+        return expectedAttributes.entrySet().stream()
+                .allMatch(entry -> matchesAttributeFilter(resolveValue(source, 
entry.getKey()), entry.getValue(),
+                        source.containsKey(entry.getKey())));
+    }
+
+    private boolean matchesAttributeFilter(String actualValue, String 
expectedValue, boolean keyExists) {
+        if (LOG_FILTER_EXISTS_PREFIX.equals(expectedValue)) {
+            return keyExists;
+        }
+        if (LOG_FILTER_NOT_EXISTS_PREFIX.equals(expectedValue)) {
+            return !keyExists;
+        }
+        if (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_IN_PREFIX)) {
+            return 
splitListLogAttributeValues(expectedValue.substring(LOG_FILTER_IN_PREFIX.length())).stream()
+                    .anyMatch(expected -> matchesOptionalValue(actualValue, 
expected));
+        }
+        if (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NOT_IN_PREFIX)) {
+            return 
splitListLogAttributeValues(expectedValue.substring(LOG_FILTER_NOT_IN_PREFIX.length())).stream()
+                    .noneMatch(expected -> matchesOptionalValue(actualValue, 
expected));
+        }
+        if (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_CONTAINS_PREFIX)) {
+            return matchesContainedValue(actualValue, 
expectedValue.substring(LOG_FILTER_CONTAINS_PREFIX.length()));
+        }
+        if (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NOT_CONTAINS_PREFIX)) {
+            return !matchesContainedValue(actualValue, 
expectedValue.substring(LOG_FILTER_NOT_CONTAINS_PREFIX.length()));
+        }
+        if (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NEGATION_PREFIX)) {
+            return !matchesOptionalValue(actualValue, 
expectedValue.substring(LOG_FILTER_NEGATION_PREFIX.length()));
+        }
+        return matchesOptionalValue(actualValue, expectedValue);
+    }
+
+    private boolean isExclusionLogAttributeFilter(String expectedValue) {
+        return (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NEGATION_PREFIX))
+                || (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NOT_IN_PREFIX))
+                || (expectedValue != null && 
expectedValue.startsWith(LOG_FILTER_NOT_CONTAINS_PREFIX))
+                || LOG_FILTER_NOT_EXISTS_PREFIX.equals(expectedValue);
+    }
+
+    private List<String> splitListLogAttributeValues(String encodedValues) {
+        if (!StringUtils.hasText(encodedValues)) {
+            return List.of();
+        }
+        return 
List.of(encodedValues.split(Pattern.quote(LOG_FILTER_VALUE_DELIMITER), 
-1)).stream()
+                .filter(StringUtils::hasText)
+                .toList();
+    }
+
+    private boolean matchesContainedValue(String actualValue, String 
expectedValue) {
+        if (!StringUtils.hasText(expectedValue)) {
+            return true;
+        }
+        return StringUtils.hasText(actualValue)
+                && 
actualValue.toLowerCase(Locale.ROOT).contains(expectedValue.trim().toLowerCase(Locale.ROOT));
+    }
+
     private String resolveWorkspaceId(Map<String, Object> resource) {
         if (resource == null || resource.isEmpty()) {
             return null;
diff --git 
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
 
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
index d029918d38..cee52d2428 100644
--- 
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
+++ 
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/controller/LogSseControllerTest.java
@@ -103,6 +103,8 @@ class LogSseControllerTest {
         String spanId = "abcdef1234567890";
         String entityId = "42";
         String entityType = "service";
+        String resourceFilter = "service.version=1.2.3";
+        String attributeFilter = "http.route:/checkout";
 
         // When: A request is made with all filter parameters
         mockMvc.perform(get("/api/logs/sse/subscribe")
@@ -112,6 +114,8 @@ class LogSseControllerTest {
                         .param("spanId", spanId)
                         .param("entityId", entityId)
                         .param("entityType", entityType)
+                        .param("resourceFilter", resourceFilter)
+                        .param("attributeFilter", attributeFilter)
                         .accept(MediaType.TEXT_EVENT_STREAM_VALUE))
                 .andExpect(status().isOk());
 
@@ -125,6 +129,8 @@ class LogSseControllerTest {
         Assertions.assertEquals(capturedCriteria.getSpanId(), spanId);
         Assertions.assertEquals(capturedCriteria.getEntityId(), entityId);
         Assertions.assertEquals(capturedCriteria.getEntityType(), entityType);
+        Assertions.assertEquals(capturedCriteria.getResourceFilter(), 
resourceFilter);
+        Assertions.assertEquals(capturedCriteria.getAttributeFilter(), 
attributeFilter);
     }
 
     @Test
diff --git 
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
 
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
index e902fcb130..cace7a91c0 100644
--- 
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
+++ 
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/logs/sse/LogSseFilterCriteriaTest.java
@@ -190,6 +190,58 @@ class LogSseFilterCriteriaTest {
         assertFalse(filterCriteria.matches(paymentStagingLog));
     }
 
+    @Test
+    void testMatchesWithResourceAndAttributeFilters() {
+        LogEntry checkoutLog = LogEntry.builder()
+                .severityText("INFO")
+                .body("checkout log")
+                .resource(java.util.Map.of(
+                        "service.version", "1.2.3",
+                        "cloud.region", "us-east-1"))
+                .attributes(java.util.Map.of(
+                        "http.route", "/checkout",
+                        "error.message", "payment timeout"))
+                .build();
+        LogEntry cartLog = LogEntry.builder()
+                .severityText("INFO")
+                .body("cart log")
+                .resource(java.util.Map.of(
+                        "service.version", "1.2.4",
+                        "cloud.region", "eu-west-1"))
+                .attributes(java.util.Map.of(
+                        "http.route", "/cart",
+                        "error.message", "cart timeout"))
+                .build();
+
+        filterCriteria.setResourceFilter("service.version=1.2.3, cloud.region 
IN ('us-east-1')");
+        filterCriteria.setAttributeFilter("http.route:/checkout and 
error.message CONTAINS payment");
+
+        assertTrue(filterCriteria.matches(checkoutLog));
+        assertFalse(filterCriteria.matches(cartLog));
+    }
+
+    @Test
+    void testMatchesWithNegativeResourceAndAttributeFilters() {
+        LogEntry checkoutLog = LogEntry.builder()
+                .severityText("INFO")
+                .body("checkout log")
+                .resource(java.util.Map.of("service.version", "1.2.3"))
+                .attributes(java.util.Map.of("http.route", "/checkout"))
+                .build();
+        LogEntry cartLog = LogEntry.builder()
+                .severityText("INFO")
+                .body("cart log")
+                .resource(java.util.Map.of("service.version", "1.2.4"))
+                .attributes(java.util.Map.of("http.route", "/cart"))
+                .build();
+
+        filterCriteria.setResourceFilter("service.version!=1.2.4");
+        filterCriteria.setAttributeFilter("http.route NOT IN ('/cart') and 
error.message NOT EXISTS");
+
+        assertTrue(filterCriteria.matches(checkoutLog));
+        assertFalse(filterCriteria.matches(cartLog));
+    }
+
     @Test
     void testMatchesWithEmptyStringFilters() {
         // Test empty string filters
diff --git a/web-next/lib/log-manage/query-state.test.ts 
b/web-next/lib/log-manage/query-state.test.ts
index 7476664fa5..2a9dfaca80 100644
--- a/web-next/lib/log-manage/query-state.test.ts
+++ b/web-next/lib/log-manage/query-state.test.ts
@@ -314,7 +314,16 @@ describe('log query state codec', () => {
     );
     expect(
       buildLogStreamUrl(
-        { search: 'timeout', logContent: '', traceId: '', spanId: '', 
severityNumber: '', severityText: 'ERROR' },
+        {
+          search: 'timeout',
+          logContent: '',
+          traceId: '',
+          spanId: '',
+          severityNumber: '',
+          severityText: 'ERROR',
+          resourceFilter: 'service.version=1.2.3',
+          attributeFilter: 'http.route:/checkout'
+        },
         {
           entityId: '42',
           entityType: 'service',
@@ -324,7 +333,7 @@ describe('log query state codec', () => {
         }
       )
     ).toBe(
-      
'/api/logs/sse/subscribe?logContent=timeout&severityText=ERROR&entityId=42&entityType=service&serviceName=checkout&serviceNamespace=payments&environment=prod'
+      
'/api/logs/sse/subscribe?logContent=timeout&severityText=ERROR&resourceFilter=service.version%3D1.2.3&attributeFilter=http.route%3A%2Fcheckout&entityId=42&entityType=service&serviceName=checkout&serviceNamespace=payments&environment=prod'
     );
   });
 
diff --git a/web-next/lib/log-manage/query-state.ts 
b/web-next/lib/log-manage/query-state.ts
index f2bbf60ac7..61fae3fbce 100644
--- a/web-next/lib/log-manage/query-state.ts
+++ b/web-next/lib/log-manage/query-state.ts
@@ -443,6 +443,9 @@ export function buildLogStreamUrl(query: LogQueryState, 
routeContext: SignalRout
   const severityNumber = normalizeSeverityNumberParam(query.severityNumber);
   if (severityNumber) params.set('severityNumber', severityNumber);
   if (query.severityText.trim()) params.set('severityText', 
query.severityText.trim());
+  const resourceFilter = query.resourceFilter?.trim() || '';
+  if (resourceFilter) params.set('resourceFilter', resourceFilter);
+  if (query.attributeFilter?.trim()) params.set('attributeFilter', 
query.attributeFilter.trim());
   appendLogEntityContextParams(params, routeContext);
   appendLogQuickFilterContext(params, routeContext);
   const qs = params.toString();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to