This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2e85f73813baa96c7c63da99617ad2fad07a9be7 Author: Gao Hongtao <[email protected]> AuthorDate: Sat May 16 11:17:43 2026 +0800 fix(query): dedup IndexMode measures by series ID across data nodes (#1132) --- .../logical/measure/measure_plan_distributed.go | 39 ++++++++- .../measure/measure_plan_distributed_test.go | 93 ++++++++++++++++++++++ pkg/query/logical/measure/schema.go | 6 ++ pkg/query/logical/measure/schema_test.go | 90 +++++++++++++++++++++ 4 files changed, 225 insertions(+), 3 deletions(-) diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 01d46ee1a..b88ef5d07 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -116,6 +116,7 @@ func newUnresolvedDistributed(query *measurev1.QueryRequest, pushDownAgg bool) l } func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) { + indexMode := s.(*schema).measure.IndexMode projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection()) if len(projectionTags) > 0 { var err error @@ -178,6 +179,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) sortTagSpec: *sortTagSpec, pushDownAgg: ud.pushDownAgg, groupByTagsRefs: groupByTagsRefs, + indexMode: indexMode, } if ud.originalQuery.OrderBy != nil && ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true @@ -191,6 +193,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) sortByTime: true, pushDownAgg: ud.pushDownAgg, groupByTagsRefs: groupByTagsRefs, + indexMode: indexMode, }, nil } if ud.originalQuery.OrderBy.IndexRuleName == "" { @@ -200,6 +203,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) sortByTime: true, pushDownAgg: ud.pushDownAgg, groupByTagsRefs: groupByTagsRefs, + indexMode: indexMode, } if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true @@ -224,6 +228,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) sortTagSpec: *sortTagSpec, pushDownAgg: ud.pushDownAgg, groupByTagsRefs: groupByTagsRefs, + indexMode: indexMode, } if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { result.desc = true @@ -240,6 +245,7 @@ type distributedPlan struct { sortByTime bool desc bool pushDownAgg bool + indexMode bool } func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, err error) { @@ -311,7 +317,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, e return &pushedDownAggregatedIterator{dataPoints: deduplicatedDps}, err } smi := &sortedMIterator{ - Iterator: sort.NewItemIter(see, t.desc), + Iterator: sort.NewItemIter(see, t.desc), + indexMode: t.indexMode, } smi.init() return smi, err @@ -421,9 +428,11 @@ type sortedMIterator struct { data *list.List uniqueData map[uint64]*measurev1.InternalDataPoint cur *measurev1.InternalDataPoint + seenSids map[uint64]struct{} initialized bool exhausted bool closed bool + indexMode bool } func (s *sortedMIterator) init() { @@ -437,6 +446,9 @@ func (s *sortedMIterator) init() { } s.data = list.New() s.uniqueData = make(map[uint64]*measurev1.InternalDataPoint) + if s.indexMode { + s.seenSids = make(map[uint64]struct{}) + } s.loadDps() } @@ -456,10 +468,23 @@ func (s *sortedMIterator) Next() bool { return true } +// loadDps consumes sort-field-equal groups from the underlying iterator and +// pushes their deduplicated rows into s.data. When indexMode is true, rows +// whose Sid was already emitted in a prior group are skipped — without this, +// cross-node duplicates that carry different per-node "last-write" timestamps +// would slip past hashDataPoint (which keys on Sid+timestamp) because they +// land in different sort-field groups. When every row in a group is filtered, +// the loop advances to the next group so callers see continued iteration. func (s *sortedMIterator) loadDps() { - if s.exhausted { - return + for !s.exhausted { + s.loadOneGroup() + if s.data.Len() > 0 { + return + } } +} + +func (s *sortedMIterator) loadOneGroup() { for k := range s.uniqueData { delete(s.uniqueData, k) } @@ -485,6 +510,13 @@ func (s *sortedMIterator) loadDps() { } } for _, v := range s.uniqueData { + if s.indexMode { + sid := v.GetDataPoint().GetSid() + if _, dup := s.seenSids[sid]; dup { + continue + } + s.seenSids[sid] = struct{}{} + } s.data.PushBack(v) } } @@ -501,6 +533,7 @@ func (s *sortedMIterator) Close() error { s.exhausted = true s.data = nil s.uniqueData = nil + s.seenSids = nil s.cur = nil if s.Iterator == nil { return nil diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go b/pkg/query/logical/measure/measure_plan_distributed_test.go index 7468b4fd1..80b44897b 100644 --- a/pkg/query/logical/measure/measure_plan_distributed_test.go +++ b/pkg/query/logical/measure/measure_plan_distributed_test.go @@ -62,6 +62,99 @@ func makeComparableDP(sid uint64, seconds, nanos int64, version int64, sortField } } +func TestSortedMIterator_IndexModeDedup(t *testing.T) { + // Cross-node duplicates for an IndexMode measure carry the same Sid but + // different per-node "last-write" timestamps. They live in different + // sort-field groups when sorted by time, so hashDataPoint (which keys on + // Sid+timestamp) cannot collapse them — only Sid-based cross-group dedup + // can. See the package-level comment on sortedMIterator.loadDps. + testCases := []struct { + name string + data []*comparableDataPoint + wantSids []uint64 + indexMode bool + }{ + { + // Sort-by-time, two nodes return the same Sid at different ts. + // IndexMode=true → only first-seen Sid survives. + name: "indexMode collapses same Sid across sort-field groups", + data: []*comparableDataPoint{ + makeComparableDP(1, 1, 0, 1, 1), + makeComparableDP(1, 2, 0, 1, 2), + }, + indexMode: true, + wantSids: []uint64{1}, + }, + { + // Same input, IndexMode=false → existing behavior preserved. + name: "row-path keeps Sid duplicates with different timestamps", + data: []*comparableDataPoint{ + makeComparableDP(1, 1, 0, 1, 1), + makeComparableDP(1, 2, 0, 1, 2), + }, + indexMode: false, + wantSids: []uint64{1, 1}, + }, + { + // Sort-by-tag (same sort field) — both modes already dedup via + // hashDataPoint here. Verify IndexMode does not regress that. + name: "indexMode collapses same Sid within one sort-field group", + data: []*comparableDataPoint{ + makeComparableDP(2, 1, 0, 1, 5), + makeComparableDP(2, 2, 0, 1, 5), + }, + indexMode: true, + wantSids: []uint64{2}, + }, + { + // Three Sids interleaved across two sort-field groups. + // IndexMode=true keeps one per Sid (first-seen). + name: "indexMode preserves distinct Sids across groups", + data: []*comparableDataPoint{ + makeComparableDP(1, 1, 0, 1, 1), + makeComparableDP(2, 1, 0, 1, 1), + makeComparableDP(1, 2, 0, 1, 2), + makeComparableDP(3, 2, 0, 1, 2), + }, + indexMode: true, + wantSids: []uint64{1, 2, 3}, + }, + { + // Group at sf=2 contains only already-seen Sids — loadDps's outer + // loop must advance past the fully-filtered group to reach sf=3 + // instead of stopping iteration. Without that, `Next` returns + // false after the first emission and Sid=2 is lost. + name: "indexMode advances past a fully-filtered intermediate group", + data: []*comparableDataPoint{ + makeComparableDP(1, 1, 0, 1, 1), + makeComparableDP(1, 2, 0, 1, 2), + makeComparableDP(2, 3, 0, 1, 3), + }, + indexMode: true, + wantSids: []uint64{1, 2}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + iter := &sortedMIterator{ + Iterator: &mockIterator{data: tc.data, idx: -1}, + indexMode: tc.indexMode, + } + iter.init() + gotSids := make([]uint64, 0, len(tc.wantSids)) + for iter.Next() { + gotSids = append(gotSids, iter.Current()[0].GetDataPoint().GetSid()) + } + slices.Sort(gotSids) + wantSids := append([]uint64(nil), tc.wantSids...) + slices.Sort(wantSids) + if diff := cmp.Diff(wantSids, gotSids); diff != "" { + t.Errorf("Sids mismatch (-want +got):\n%s", diff) + } + }) + } +} + func TestSortedMIterator(t *testing.T) { testCases := []struct { name string diff --git a/pkg/query/logical/measure/schema.go b/pkg/query/logical/measure/schema.go index 3bdc81cad..3ce7b789f 100644 --- a/pkg/query/logical/measure/schema.go +++ b/pkg/query/logical/measure/schema.go @@ -136,6 +136,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, error) { var commonSchemas []*logical.CommonSchema var tagFamilies []*databasev1.TagFamilySpec + var canonicalMeasure *databasev1.Measure fieldMap := make(map[string]*logical.FieldSpec) for _, sm := range schemas { @@ -149,6 +150,10 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, error) { tagFamilies = logical.MergeTagFamilySpecs(tagFamilies, mSchema.measure.GetTagFamilies()) + if canonicalMeasure == nil { + canonicalMeasure = mSchema.measure + } + commonSchemas = append(commonSchemas, mSchema.common) for name, spec := range mSchema.fieldMap { @@ -180,6 +185,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, error) { } ret := &schema{ + measure: canonicalMeasure, common: mergedCommon, children: schemas, fieldMap: fieldMap, diff --git a/pkg/query/logical/measure/schema_test.go b/pkg/query/logical/measure/schema_test.go new file mode 100644 index 000000000..8fc8fe173 --- /dev/null +++ b/pkg/query/logical/measure/schema_test.go @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +package measure + +import ( + "testing" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +// mergeSchema is called from DistributedAnalyze whenever a measure query +// spans more than one group. The merged schema must keep enough of the +// underlying *databasev1.Measure that downstream consumers (notably the +// IndexMode check in unresolvedDistributed.Analyze) do not nil-deref. +func TestMergeSchema_PropagatesMeasure(t *testing.T) { + build := func(group string, indexMode bool) *schema { + md := &databasev1.Measure{ + Metadata: &commonv1.Metadata{Name: "zipkin_service_traffic_minute", Group: group}, + Entity: &databasev1.Entity{TagNames: []string{"id"}}, + TagFamilies: []*databasev1.TagFamilySpec{ + {Name: "storage-only", Tags: []*databasev1.TagSpec{{Name: "id", Type: databasev1.TagType_TAG_TYPE_STRING}}}, + }, + IndexMode: indexMode, + } + s, err := BuildSchema(md, nil) + if err != nil { + t.Fatalf("BuildSchema(%s) failed: %v", group, err) + } + return s.(*schema) + } + + t.Run("multi-group merge carries IndexMode=true", func(t *testing.T) { + a := build("sw_metadata", true) + b := build("sw_metadata_replica", true) + merged, err := mergeSchema([]logical.Schema{a, b}) + if err != nil { + t.Fatalf("mergeSchema: %v", err) + } + ms := merged.(*schema) + if ms.measure == nil { + t.Fatal("merged schema lost its *databasev1.Measure; downstream IndexMode checks will nil-deref") + } + if !ms.measure.IndexMode { + t.Fatalf("merged IndexMode = false, want true") + } + }) + + t.Run("multi-group merge carries IndexMode=false", func(t *testing.T) { + a := build("sw_metricsMinute", false) + b := build("sw_metricsMinute_replica", false) + merged, err := mergeSchema([]logical.Schema{a, b}) + if err != nil { + t.Fatalf("mergeSchema: %v", err) + } + ms := merged.(*schema) + if ms.measure == nil { + t.Fatal("merged schema lost its *databasev1.Measure") + } + if ms.measure.IndexMode { + t.Fatalf("merged IndexMode = true, want false") + } + }) + + t.Run("single-group fast path returns input unchanged", func(t *testing.T) { + a := build("sw_metadata", true) + merged, err := mergeSchema([]logical.Schema{a}) + if err != nil { + t.Fatalf("mergeSchema: %v", err) + } + if merged.(*schema) != a { + t.Fatal("single-group mergeSchema returned a new instance; expected pass-through") + } + }) +}
