This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch topn in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 09c558b97b853dfcfa6e2b82d6b5740b29bdf576 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jun 25 21:04:52 2025 +0800 Fix topN parsing panic when the criteria is set Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/measure/metadata.go | 29 ++++++++++++++++++++++ .../topn_aggregations/not_in_svc2_svc4 copy.json | 28 +++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index a9d87e88..0edad3ac 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "path" + "strings" "sync" "time" @@ -236,6 +237,7 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) { Kind: resourceSchema.EventKindGroup, Metadata: g, }) + sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name) case schema.KindMeasure: m := metadata.Spec.(*databasev1.Measure) sr.SendMetadataEvent(resourceSchema.MetadataEvent{ @@ -338,6 +340,33 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, measureSchema } } +func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) { + var keysToDelete []string + groupPrefix := groupName + "/" + + sr.topNProcessorMap.Range(func(key, val any) bool { + keyStr := key.(string) + if strings.HasPrefix(keyStr, groupPrefix) { + keysToDelete = append(keysToDelete, keyStr) + } + return true + }) + + for _, key := range keysToDelete { + if v, ok := sr.topNProcessorMap.Load(key); ok { + manager := v.(*topNProcessorManager) + if err := manager.Close(); err != nil { + sr.l.Error().Err(err).Str("key", key).Msg("failed to close topN processor manager") + } + sr.topNProcessorMap.Delete(key) + } + } + + if len(keysToDelete) > 0 { + sr.l.Info().Str("groupName", groupName).Int("count", len(keysToDelete)).Msg("stopped topN processors for group") + } +} + var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { diff --git a/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json new file mode 100644 index 00000000..d95e3c7e --- /dev/null +++ b/pkg/test/measure/testdata/topn_aggregations/not_in_svc2_svc4 copy.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "not_in_svc2_svc4", + "group": "sw_metric" + }, + "source_measure": { + "name": "service_instance_endpoint_cpm_minute", + "group": "sw_metric" + }, + "field_name": "total", + "field_value_sort": 0, + "group_by_tag_names": [ + "http.uri" + ], + "criteria": { + "condition":{ + "name": "service_id", + "op": "BINARY_OP_NOT_IN", + "value": { + "str_array": { + "value": ["svc_2", "svc_4"] + } + } + } + }, + "counters_number": 1000, + "lru_size": 10 +} \ No newline at end of file