This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 09c558b97b853dfcfa6e2b82d6b5740b29bdf576
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jun 25 21:04:52 2025 +0800

    Fix topN parsing panic when the criteria is set
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/measure/metadata.go                        | 29 ++++++++++++++++++++++
 .../topn_aggregations/not_in_svc2_svc4 copy.json   | 28 +++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index a9d87e88..0edad3ac 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "path"
+       "strings"
        "sync"
        "time"
 
@@ -236,6 +237,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                        Kind:     resourceSchema.EventKindGroup,
                        Metadata: g,
                })
+               sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name)
        case schema.KindMeasure:
                m := metadata.Spec.(*databasev1.Measure)
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
@@ -338,6 +340,33 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx 
context.Context, measureSchema
        }
 }
 
+func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) {
+       var keysToDelete []string
+       groupPrefix := groupName + "/"
+
+       sr.topNProcessorMap.Range(func(key, val any) bool {
+               keyStr := key.(string)
+               if strings.HasPrefix(keyStr, groupPrefix) {
+                       keysToDelete = append(keysToDelete, keyStr)
+               }
+               return true
+       })
+
+       for _, key := range keysToDelete {
+               if v, ok := sr.topNProcessorMap.Load(key); ok {
+                       manager := v.(*topNProcessorManager)
+                       if err := manager.Close(); err != nil {
+                               sr.l.Error().Err(err).Str("key", 
key).Msg("failed to close topN processor manager")
+                       }
+                       sr.topNProcessorMap.Delete(key)
+               }
+       }
+
+       if len(keysToDelete) > 0 {
+               sr.l.Info().Str("groupName", groupName).Int("count", 
len(keysToDelete)).Msg("stopped topN processors for group")
+       }
+}
+
 var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
diff --git a/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 
copy.json b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 
copy.json
new file mode 100644
index 00000000..d95e3c7e
--- /dev/null
+++ b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json    
@@ -0,0 +1,28 @@
+{
+  "metadata": {
+    "name": "not_in_svc2_svc4",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "total",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "http.uri"
+  ],
+  "criteria": {
+    "condition":{
+      "name": "service_id",
+      "op": "BINARY_OP_NOT_IN",
+      "value": {
+        "str_array": {
+            "value": ["svc_2", "svc_4"]
+        }
+      }
+    }
+  },
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file

Reply via email to