This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch topn in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 467c209995d3ef72044846540d7592e3f13a20e9 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Jun 25 21:04:52 2025 +0800 Fix topN parsing panic when the criteria is set Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/internal/storage/segment_test.go | 15 ++++++-- banyand/measure/metadata.go | 29 +++++++++++++++ banyand/measure/topn.go | 3 ++ pkg/fs/local_file_system_darwin.go | 10 +---- .../testdata/topn_aggregations/eq_svc1.json | 28 ++++++++++++++ .../testdata/topn_aggregations/in_svc2_svc4.json | 28 ++++++++++++++ .../testdata/topn_aggregations/ne_svc1.json | 28 ++++++++++++++ .../topn_aggregations/not_in_svc2_svc4 copy.json | 28 ++++++++++++++ test/cases/topn/data/input/eq.yaml | 22 +++++++++++ test/cases/topn/data/input/in.yaml | 22 +++++++++++ test/cases/topn/data/input/ne.yaml | 22 +++++++++++ test/cases/topn/data/input/not_in.yaml | 22 +++++++++++ test/cases/topn/data/want/eq.yaml | 43 ++++++++++++++++++++++ test/cases/topn/data/want/in.yaml | 43 ++++++++++++++++++++++ test/cases/topn/data/want/ne.yaml | 42 +++++++++++++++++++++ test/cases/topn/data/want/not_in.yaml | 42 +++++++++++++++++++++ test/cases/topn/topn.go | 4 ++ 18 files changed, 421 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f67d57fe..5ebb4d53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,6 +24,7 @@ Release Notes. - Fix the deadlock issue when loading a closed segment. - Fix the issue that the etcd watcher gets the historical node registration events. - Fix the crash when collecting the metrics from a closed segment. +- Fix topN parsing panic when the criteria is set. ## 0.8.0 diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index b97d34fd..1fc8691a 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -592,9 +592,18 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t *testing.T) { // Set the "lastAccessed" time for some segments to make them idle // Make the first, third, and fifth segments idle (0, 2, 4) - segments[0].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) // day1 - expired - segments[2].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) // day3 - expired - segments[4].lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) // day5 - not expired + // Use a time that's definitely older than the idle timeout + idleTime := time.Now().Add(-2 * idleTimeout).UnixNano() + segments[0].lastAccessed.Store(idleTime) // day1 - expired + segments[2].lastAccessed.Store(idleTime) // day3 - expired + segments[4].lastAccessed.Store(idleTime) // day5 - not expired + + // Keep segments 1, 3, and 5 active by setting their lastAccessed to current time + // Use a time that's definitely newer than the idle timeout threshold + activeTime := time.Now().UnixNano() + segments[1].lastAccessed.Store(activeTime) // day2 - expired but should stay open + segments[3].lastAccessed.Store(activeTime) // day4 - not expired + segments[5].lastAccessed.Store(activeTime) // day6 - not expired // Close idle segments closedCount := sc.closeIdleSegments() diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index a9d87e88..0edad3ac 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "path" + "strings" "sync" "time" @@ -236,6 +237,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) { Kind: resourceSchema.EventKindGroup, Metadata: g, }) + sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name) case schema.KindMeasure: m := metadata.Spec.(*databasev1.Measure) sr.SendMetadataEvent(resourceSchema.MetadataEvent{ @@ -338,6 +340,33 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, measureSchema } } +func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) { + var keysToDelete []string + groupPrefix := groupName + "/" + + sr.topNProcessorMap.Range(func(key, val any) bool { + keyStr := key.(string) + if strings.HasPrefix(keyStr, groupPrefix) { + keysToDelete = append(keysToDelete, keyStr) + } + return true + }) + + for _, key := range keysToDelete { + if v, ok := sr.topNProcessorMap.Load(key); ok { + manager := v.(*topNProcessorManager) + if err := manager.Close(); err != nil { + sr.l.Error().Err(err).Str("key", key).Msg("failed to close topN processor manager") + } + sr.topNProcessorMap.Delete(key) + } + } + + if len(keysToDelete) > 0 { + sr.l.Info().Str("groupName", groupName).Int("count", len(keysToDelete)).Msg("stopped topN processors for group") + } +} + var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index e4a7317f..206cf74a 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -353,6 +353,7 @@ func (manager *topNProcessorManager) init(m *databasev1.Measure) { manager.m = m tagMapSpec := logical.TagSpecMap{} tagMapSpec.RegisterTagFamilies(m.GetTagFamilies()) + manager.s = tagMapSpec for i := range manager.registeredTasks { if err := manager.start(manager.registeredTasks[i]); err != nil { manager.l.Err(err).Msg("fail to start processor") @@ -369,6 +370,8 @@ func (manager *topNProcessorManager) Close() error { } manager.processorList = nil manager.registeredTasks = nil + manager.s = nil + manager.m = nil return err } diff --git a/pkg/fs/local_file_system_darwin.go b/pkg/fs/local_file_system_darwin.go index ae633e84..8fd79aa5 100644 --- a/pkg/fs/local_file_system_darwin.go +++ b/pkg/fs/local_file_system_darwin.go @@ -26,8 +26,6 @@ import ( "syscall" "golang.org/x/sys/unix" - - "github.com/apache/skywalking-banyandb/pkg/logger" ) // localFileSystem is the implementation of FileSystem interface. @@ -115,18 +113,14 @@ func CompareINode(srcPath, destPath string) error { } // applyFadviseToFD is a no-op on non-Linux systems. -func applyFadviseToFD(fd uintptr, offset int64, length int64) error { +func applyFadviseToFD(_ uintptr, _ int64, _ int64) error { return nil } // SyncAndDropCache syncs the file data to disk but doesn't drop it from the page cache on macOS. -func SyncAndDropCache(fd uintptr, offset int64, length int64) error { +func SyncAndDropCache(fd uintptr, _ int64, _ int64) error { if err := unix.FcntlFlock(fd, unix.F_FULLFSYNC, &unix.Flock_t{}); err != nil { return err } - - logger.GetLogger(moduleName). - Debug(). - Msg("SyncAndDropCache: fullfsync succeeded, page-cache drop unsupported on darwin") return nil } diff --git a/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json b/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json new file mode 100644 index 00000000..239798fc --- /dev/null +++ b/pkg/test/measure/testdata/topn_aggregations/eq_svc1.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "eq_svc1", + "group": "sw_metric" + }, + "source_measure": { + "name": "service_instance_endpoint_cpm_minute", + "group": "sw_metric" + }, + "field_name": "total", + "field_value_sort": 0, + "group_by_tag_names": [ + "http.uri" + ], + "criteria": { + "condition":{ + "name": "service_id", + "op": "BINARY_OP_EQ", + "value": { + "str": { + "value": "svc_1" + } + } + } + }, + "counters_number": 1000, + "lru_size": 10 +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json b/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json new file mode 100644 index 00000000..76077629 --- /dev/null +++ b/pkg/test/measure/testdata/topn_aggregations/in_svc2_svc4.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "in_svc2_svc4", + "group": "sw_metric" + }, + "source_measure": { + "name": "service_instance_endpoint_cpm_minute", + "group": "sw_metric" + }, + "field_name": "total", + "field_value_sort": 0, + "group_by_tag_names": [ + "http.uri" + ], + "criteria": { + "condition":{ + "name": "service_id", + "op": "BINARY_OP_IN", + "value": { + "str_array": { + "value": ["svc_2", "svc_4"] + } + } + } + }, + "counters_number": 1000, + "lru_size": 10 +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json b/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json new file mode 100644 index 00000000..5289e151 --- /dev/null +++ b/pkg/test/measure/testdata/topn_aggregations/ne_svc1.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "ne_svc1", + "group": "sw_metric" + }, + "source_measure": { + "name": "service_instance_endpoint_cpm_minute", + "group": "sw_metric" + }, + "field_name": "total", + "field_value_sort": 0, + "group_by_tag_names": [ + "http.uri" + ], + "criteria": { + "condition":{ + "name": "service_id", + "op": "BINARY_OP_NE", + "value": { + "str": { + "value": "svc_1" + } + } + } + }, + "counters_number": 1000, + "lru_size": 10 +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json new file mode 100644 index 00000000..d95e3c7e --- /dev/null +++ b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "not_in_svc2_svc4", + "group": "sw_metric" + }, + "source_measure": { + "name": "service_instance_endpoint_cpm_minute", + "group": "sw_metric" + }, + "field_name": "total", + "field_value_sort": 0, + "group_by_tag_names": [ + "http.uri" + ], + "criteria": { + "condition":{ + "name": "service_id", + "op": "BINARY_OP_NOT_IN", + "value": { + "str_array": { + "value": ["svc_2", "svc_4"] + } + } + } + }, + "counters_number": 1000, + "lru_size": 10 +} \ No newline at end of file diff --git a/test/cases/topn/data/input/eq.yaml b/test/cases/topn/data/input/eq.yaml new file mode 100644 index 00000000..93e5bc6d --- /dev/null +++ b/test/cases/topn/data/input/eq.yaml @@ -0,0 +1,22 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "eq_svc1" +groups: ["sw_metric"] +topN: 2 +fieldValueSort: 1 +agg: 2 diff --git a/test/cases/topn/data/input/in.yaml b/test/cases/topn/data/input/in.yaml new file mode 100644 index 00000000..28b6cf14 --- /dev/null +++ b/test/cases/topn/data/input/in.yaml @@ -0,0 +1,22 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "in_svc2_svc4" +groups: ["sw_metric"] +topN: 2 +fieldValueSort: 1 +agg: 2 diff --git a/test/cases/topn/data/input/ne.yaml b/test/cases/topn/data/input/ne.yaml new file mode 100644 index 00000000..ee10a6d9 --- /dev/null +++ b/test/cases/topn/data/input/ne.yaml @@ -0,0 +1,22 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "ne_svc1" +groups: ["sw_metric"] +topN: 2 +fieldValueSort: 1 +agg: 2 diff --git a/test/cases/topn/data/input/not_in.yaml b/test/cases/topn/data/input/not_in.yaml new file mode 100644 index 00000000..b72e6db0 --- /dev/null +++ b/test/cases/topn/data/input/not_in.yaml @@ -0,0 +1,22 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "not_in_svc2_svc4" +groups: ["sw_metric"] +topN: 2 +fieldValueSort: 1 +agg: 2 diff --git a/test/cases/topn/data/want/eq.yaml b/test/cases/topn/data/want/eq.yaml new file mode 100644 index 00000000..a9fe852a --- /dev/null +++ b/test/cases/topn/data/want/eq.yaml @@ -0,0 +1,43 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +lists: +- items: + - entity: + - key: service_id + value: + str: + value: svc_1 + - key: entity_id + value: + str: + value: entity_1 + value: + int: + value: "240" + - entity: + - key: service_id + value: + str: + value: svc_1 + - key: entity_id + value: + str: + value: entity_3 + value: + int: + value: "133" diff --git a/test/cases/topn/data/want/in.yaml b/test/cases/topn/data/want/in.yaml new file mode 100644 index 00000000..169ef690 --- /dev/null +++ b/test/cases/topn/data/want/in.yaml @@ -0,0 +1,43 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +lists: +- items: + - entity: + - key: service_id + value: + str: + value: svc_2 + - key: entity_id + value: + str: + value: entity_3 + value: + int: + value: "155" + - entity: + - key: service_id + value: + str: + value: svc_4 + - key: entity_id + value: + str: + value: entity_2 + value: + int: + value: "130" diff --git a/test/cases/topn/data/want/ne.yaml b/test/cases/topn/data/want/ne.yaml new file mode 100644 index 00000000..43727cd9 --- /dev/null +++ b/test/cases/topn/data/want/ne.yaml @@ -0,0 +1,42 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +lists: +- items: + - entity: + - key: service_id + value: + "null": null + - key: entity_id + value: + str: + value: entity_1 + value: + int: + value: "300" + - entity: + - key: service_id + value: + str: + value: svc_2 + - key: entity_id + value: + str: + value: entity_3 + value: + int: + value: "155" diff --git a/test/cases/topn/data/want/not_in.yaml b/test/cases/topn/data/want/not_in.yaml new file mode 100644 index 00000000..9ccb5d62 --- /dev/null +++ b/test/cases/topn/data/want/not_in.yaml @@ -0,0 +1,42 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +lists: +- items: + - entity: + - key: service_id + value: + "null": null + - key: entity_id + value: + str: + value: entity_1 + value: + int: + value: "300" + - entity: + - key: service_id + value: + str: + value: svc_1 + - key: entity_id + value: + str: + value: entity_1 + value: + int: + value: "240" diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go index 357eeaeb..bcb1d39b 100644 --- a/test/cases/topn/topn.go +++ b/test/cases/topn/topn.go @@ -44,4 +44,8 @@ var _ = g.DescribeTable("TopN Tests", verify, g.Entry("max top3 with condition order by desc", helpers.Args{Input: "condition_aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("max top3 for null group order by desc", helpers.Args{Input: "null_group", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("multi-group: max top3 order by desc", helpers.Args{Input: "multi_group_aggr_desc", Want: "aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("using equal in aggregation", helpers.Args{Input: "eq", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("using not equal in aggregation", helpers.Args{Input: "ne", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("using in operation in aggregation", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("using not-in operation in aggregation", helpers.Args{Input: "not_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )
