This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 467c209995d3ef72044846540d7592e3f13a20e9
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 25 21:04:52 2025 +0800

    Fix topN parsing panic when the criteria is set
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |  1 +
 banyand/internal/storage/segment_test.go           | 15 ++++++--
 banyand/measure/metadata.go                        | 29 +++++++++++++++
 banyand/measure/topn.go                            |  3 ++
 pkg/fs/local_file_system_darwin.go                 | 10 +----
 .../testdata/topn_aggregations/eq_svc1.json        | 28 ++++++++++++++
 .../testdata/topn_aggregations/in_svc2_svc4.json   | 28 ++++++++++++++
 .../testdata/topn_aggregations/ne_svc1.json        | 28 ++++++++++++++
 .../topn_aggregations/not_in_svc2_svc4 copy.json   | 28 ++++++++++++++
 test/cases/topn/data/input/eq.yaml                 | 22 +++++++++++
 test/cases/topn/data/input/in.yaml                 | 22 +++++++++++
 test/cases/topn/data/input/ne.yaml                 | 22 +++++++++++
 test/cases/topn/data/input/not_in.yaml             | 22 +++++++++++
 test/cases/topn/data/want/eq.yaml                  | 43 ++++++++++++++++++++++
 test/cases/topn/data/want/in.yaml                  | 43 ++++++++++++++++++++++
 test/cases/topn/data/want/ne.yaml                  | 42 +++++++++++++++++++++
 test/cases/topn/data/want/not_in.yaml              | 42 +++++++++++++++++++++
 test/cases/topn/topn.go                            |  4 ++
 18 files changed, 421 insertions(+), 11 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index f67d57fe..5ebb4d53 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
 - Fix the deadlock issue when loading a closed segment.
 - Fix the issue that the etcd watcher gets the historical node registration 
events.
 - Fix the crash when collecting the metrics from a closed segment.
+- Fix topN parsing panic when the criteria is set.
 
 ## 0.8.0
 
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index b97d34fd..1fc8691a 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -592,9 +592,18 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
 
        // Set the "lastAccessed" time for some segments to make them idle
        // Make the first, third, and fifth segments idle (0, 2, 4)
-       segments[0].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) 
// day1 - expired
-       segments[2].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) 
// day3 - expired
-       segments[4].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) 
// day5 - not expired
+       // Use a time that's definitely older than the idle timeout
+       idleTime := time.Now().Add(-2 * idleTimeout).UnixNano()
+       segments[0].lastAccessed.Store(idleTime) // day1 - expired
+       segments[2].lastAccessed.Store(idleTime) // day3 - expired
+       segments[4].lastAccessed.Store(idleTime) // day5 - not expired
+
+       // Keep segments 1, 3, and 5 active by setting their lastAccessed to 
current time
+       // Use a time that's definitely newer than the idle timeout threshold
+       activeTime := time.Now().UnixNano()
+       segments[1].lastAccessed.Store(activeTime) // day2 - expired but should 
stay open
+       segments[3].lastAccessed.Store(activeTime) // day4 - not expired
+       segments[5].lastAccessed.Store(activeTime) // day6 - not expired
 
        // Close idle segments
        closedCount := sc.closeIdleSegments()
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index a9d87e88..0edad3ac 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "path"
+       "strings"
        "sync"
        "time"
 
@@ -236,6 +237,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                        Kind:     resourceSchema.EventKindGroup,
                        Metadata: g,
                })
+               sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name)
        case schema.KindMeasure:
                m := metadata.Spec.(*databasev1.Measure)
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
@@ -338,6 +340,33 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx 
context.Context, measureSchema
        }
 }
 
+func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) {
+       var keysToDelete []string
+       groupPrefix := groupName + "/"
+
+       sr.topNProcessorMap.Range(func(key, val any) bool {
+               keyStr := key.(string)
+               if strings.HasPrefix(keyStr, groupPrefix) {
+                       keysToDelete = append(keysToDelete, keyStr)
+               }
+               return true
+       })
+
+       for _, key := range keysToDelete {
+               if v, ok := sr.topNProcessorMap.Load(key); ok {
+                       manager := v.(*topNProcessorManager)
+                       if err := manager.Close(); err != nil {
+                               sr.l.Error().Err(err).Str("key", 
key).Msg("failed to close topN processor manager")
+                       }
+                       sr.topNProcessorMap.Delete(key)
+               }
+       }
+
+       if len(keysToDelete) > 0 {
+               sr.l.Info().Str("groupName", groupName).Int("count", 
len(keysToDelete)).Msg("stopped topN processors for group")
+       }
+}
+
 var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index e4a7317f..206cf74a 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -353,6 +353,7 @@ func (manager *topNProcessorManager) init(m 
*databasev1.Measure) {
        manager.m = m
        tagMapSpec := logical.TagSpecMap{}
        tagMapSpec.RegisterTagFamilies(m.GetTagFamilies())
+       manager.s = tagMapSpec
        for i := range manager.registeredTasks {
                if err := manager.start(manager.registeredTasks[i]); err != nil 
{
                        manager.l.Err(err).Msg("fail to start processor")
@@ -369,6 +370,8 @@ func (manager *topNProcessorManager) Close() error {
        }
        manager.processorList = nil
        manager.registeredTasks = nil
+       manager.s = nil
+       manager.m = nil
        return err
 }
 
diff --git a/pkg/fs/local_file_system_darwin.go 
b/pkg/fs/local_file_system_darwin.go
index ae633e84..8fd79aa5 100644
--- a/pkg/fs/local_file_system_darwin.go
+++ b/pkg/fs/local_file_system_darwin.go
@@ -26,8 +26,6 @@ import (
        "syscall"
 
        "golang.org/x/sys/unix"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 // localFileSystem is the implementation of FileSystem interface.
@@ -115,18 +113,14 @@ func CompareINode(srcPath, destPath string) error {
 }
 
 // applyFadviseToFD is a no-op on non-Linux systems.
-func applyFadviseToFD(fd uintptr, offset int64, length int64) error {
+func applyFadviseToFD(_ uintptr, _ int64, _ int64) error {
        return nil
 }
 
 // SyncAndDropCache syncs the file data to disk but doesn't drop it from the 
page cache on macOS.
-func SyncAndDropCache(fd uintptr, offset int64, length int64) error {
+func SyncAndDropCache(fd uintptr, _ int64, _ int64) error {
        if err := unix.FcntlFlock(fd, unix.F_FULLFSYNC, &unix.Flock_t{}); err 
!= nil {
                return err
        }
-
-       logger.GetLogger(moduleName).
-               Debug().
-               Msg("SyncAndDropCache: fullfsync succeeded, page-cache drop 
unsupported on darwin")
        return nil
 }
diff --git a/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json 
b/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json
new file mode 100644
index 00000000..239798fc
--- /dev/null
+++ b/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json
@@ -0,0 +1,28 @@
+{
+  "metadata": {
+    "name": "eq_svc1",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "total",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "http.uri"
+  ],
+  "criteria": {
+    "condition":{
+      "name": "service_id",
+      "op": "BINARY_OP_EQ",
+      "value": {
+        "str": {
+          "value": "svc_1"
+        }
+      }
+    }
+  },
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json 
b/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json
new file mode 100644
index 00000000..76077629
--- /dev/null
+++ b/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json
@@ -0,0 +1,28 @@
+{
+  "metadata": {
+    "name": "in_svc2_svc4",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "total",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "http.uri"
+  ],
+  "criteria": {
+    "condition":{
+      "name": "service_id",
+      "op": "BINARY_OP_IN",
+      "value": {
+        "str_array": {
+            "value": ["svc_2", "svc_4"]
+        }
+      }
+    }
+  },
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json 
b/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json
new file mode 100644
index 00000000..5289e151
--- /dev/null
+++ b/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json
@@ -0,0 +1,28 @@
+{
+  "metadata": {
+    "name": "ne_svc1",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "total",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "http.uri"
+  ],
+  "criteria": {
+    "condition":{
+      "name": "service_id",
+      "op": "BINARY_OP_NE",
+      "value": {
+        "str": {
+          "value": "svc_1"
+        }
+      }
+    }
+  },
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 
copy.json b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 
copy.json
new file mode 100644
index 00000000..d95e3c7e
--- /dev/null
+++ b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json    
@@ -0,0 +1,28 @@
+{
+  "metadata": {
+    "name": "not_in_svc2_svc4",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "total",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "http.uri"
+  ],
+  "criteria": {
+    "condition":{
+      "name": "service_id",
+      "op": "BINARY_OP_NOT_IN",
+      "value": {
+        "str_array": {
+            "value": ["svc_2", "svc_4"]
+        }
+      }
+    }
+  },
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file
diff --git a/test/cases/topn/data/input/eq.yaml 
b/test/cases/topn/data/input/eq.yaml
new file mode 100644
index 00000000..93e5bc6d
--- /dev/null
+++ b/test/cases/topn/data/input/eq.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "eq_svc1"
+groups: ["sw_metric"]
+topN: 2
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/input/in.yaml 
b/test/cases/topn/data/input/in.yaml
new file mode 100644
index 00000000..28b6cf14
--- /dev/null
+++ b/test/cases/topn/data/input/in.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "in_svc2_svc4"
+groups: ["sw_metric"]
+topN: 2
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/input/ne.yaml 
b/test/cases/topn/data/input/ne.yaml
new file mode 100644
index 00000000..ee10a6d9
--- /dev/null
+++ b/test/cases/topn/data/input/ne.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "ne_svc1"
+groups: ["sw_metric"]
+topN: 2
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/input/not_in.yaml 
b/test/cases/topn/data/input/not_in.yaml
new file mode 100644
index 00000000..b72e6db0
--- /dev/null
+++ b/test/cases/topn/data/input/not_in.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "not_in_svc2_svc4"
+groups: ["sw_metric"]
+topN: 2
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/want/eq.yaml 
b/test/cases/topn/data/want/eq.yaml
new file mode 100644
index 00000000..a9fe852a
--- /dev/null
+++ b/test/cases/topn/data/want/eq.yaml
@@ -0,0 +1,43 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+- items:
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "240"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
+    value:
+      int:
+        value: "133"
diff --git a/test/cases/topn/data/want/in.yaml 
b/test/cases/topn/data/want/in.yaml
new file mode 100644
index 00000000..169ef690
--- /dev/null
+++ b/test/cases/topn/data/want/in.yaml
@@ -0,0 +1,43 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+- items:
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
+    value:
+      int:
+        value: "155"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_4
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
+    value:
+      int:
+        value: "130"
diff --git a/test/cases/topn/data/want/ne.yaml 
b/test/cases/topn/data/want/ne.yaml
new file mode 100644
index 00000000..43727cd9
--- /dev/null
+++ b/test/cases/topn/data/want/ne.yaml
@@ -0,0 +1,42 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+- items:
+  - entity:
+    - key: service_id
+      value:
+        "null": null
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "300"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
+    value:
+      int:
+        value: "155"
diff --git a/test/cases/topn/data/want/not_in.yaml 
b/test/cases/topn/data/want/not_in.yaml
new file mode 100644
index 00000000..9ccb5d62
--- /dev/null
+++ b/test/cases/topn/data/want/not_in.yaml
@@ -0,0 +1,42 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+- items:
+  - entity:
+    - key: service_id
+      value:
+        "null": null
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "300"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "240"
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index 357eeaeb..bcb1d39b 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -44,4 +44,8 @@ var _ = g.DescribeTable("TopN Tests", verify,
        g.Entry("max top3 with condition order by desc", helpers.Args{Input: 
"condition_aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("max top3 for null group order by desc", helpers.Args{Input: 
"null_group", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("multi-group: max top3 order by desc", helpers.Args{Input: 
"multi_group_aggr_desc", Want: "aggr_desc", Duration: 25 * time.Minute, Offset: 
-20 * time.Minute}),
+       g.Entry("using equal in aggregation", helpers.Args{Input: "eq", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("using not equal in aggregation", helpers.Args{Input: "ne", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("using in operation in aggregation", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("using not-in operation in aggregation", helpers.Args{Input: 
"not_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )

Reply via email to