This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4320e7cee fix(query): dedup IndexMode measures by series ID across 
data nodes (#1132)
4320e7cee is described below

commit 4320e7ceea42abc12540bd7a9c658d0059cbba95
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat May 16 11:17:43 2026 +0800

    fix(query): dedup IndexMode measures by series ID across data nodes (#1132)
---
 .../logical/measure/measure_plan_distributed.go    | 39 ++++++++-
 .../measure/measure_plan_distributed_test.go       | 93 ++++++++++++++++++++++
 pkg/query/logical/measure/schema.go                |  6 ++
 pkg/query/logical/measure/schema_test.go           | 90 +++++++++++++++++++++
 4 files changed, 225 insertions(+), 3 deletions(-)

diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 364d75392..0b7a3734d 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -119,6 +119,7 @@ func newUnresolvedDistributed(query 
*measurev1.QueryRequest, pushDownAgg bool) l
 }
 
 func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, 
error) {
+       indexMode := s.(*schema).measure.IndexMode
        projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection())
        if len(projectionTags) > 0 {
                var err error
@@ -181,6 +182,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        sortTagSpec:     *sortTagSpec,
                        pushDownAgg:     ud.pushDownAgg,
                        groupByTagsRefs: groupByTagsRefs,
+                       indexMode:       indexMode,
                }
                if ud.originalQuery.OrderBy != nil && 
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -194,6 +196,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        sortByTime:      true,
                        pushDownAgg:     ud.pushDownAgg,
                        groupByTagsRefs: groupByTagsRefs,
+                       indexMode:       indexMode,
                }, nil
        }
        if ud.originalQuery.OrderBy.IndexRuleName == "" {
@@ -203,6 +206,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        sortByTime:      true,
                        pushDownAgg:     ud.pushDownAgg,
                        groupByTagsRefs: groupByTagsRefs,
+                       indexMode:       indexMode,
                }
                if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -227,6 +231,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                sortTagSpec:     *sortTagSpec,
                pushDownAgg:     ud.pushDownAgg,
                groupByTagsRefs: groupByTagsRefs,
+               indexMode:       indexMode,
        }
        if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                result.desc = true
@@ -243,6 +248,7 @@ type distributedPlan struct {
        sortByTime        bool
        desc              bool
        pushDownAgg       bool
+       indexMode         bool
 }
 
 func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, 
err error) {
@@ -314,7 +320,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                return &pushedDownAggregatedIterator{dataPoints: 
deduplicatedDps}, err
        }
        smi := &sortedMIterator{
-               Iterator: sort.NewItemIter(see, t.desc),
+               Iterator:  sort.NewItemIter(see, t.desc),
+               indexMode: t.indexMode,
        }
        smi.init()
        return smi, err
@@ -424,9 +431,11 @@ type sortedMIterator struct {
        data        *list.List
        uniqueData  map[uint64]*measurev1.InternalDataPoint
        cur         *measurev1.InternalDataPoint
+       seenSids    map[uint64]struct{}
        initialized bool
        exhausted   bool
        closed      bool
+       indexMode   bool
 }
 
 func (s *sortedMIterator) init() {
@@ -440,6 +449,9 @@ func (s *sortedMIterator) init() {
        }
        s.data = list.New()
        s.uniqueData = make(map[uint64]*measurev1.InternalDataPoint)
+       if s.indexMode {
+               s.seenSids = make(map[uint64]struct{})
+       }
        s.loadDps()
 }
 
@@ -459,10 +471,23 @@ func (s *sortedMIterator) Next() bool {
        return true
 }
 
+// loadDps consumes sort-field-equal groups from the underlying iterator and
+// pushes their deduplicated rows into s.data. When indexMode is true, rows
+// whose Sid was already emitted in a prior group are skipped — without this,
+// cross-node duplicates that carry different per-node "last-write" timestamps
+// would slip past hashDataPoint (which keys on Sid+timestamp) because they
+// land in different sort-field groups. When every row in a group is filtered,
+// the loop advances to the next group so callers see continued iteration.
 func (s *sortedMIterator) loadDps() {
-       if s.exhausted {
-               return
+       for !s.exhausted {
+               s.loadOneGroup()
+               if s.data.Len() > 0 {
+                       return
+               }
        }
+}
+
+func (s *sortedMIterator) loadOneGroup() {
        for k := range s.uniqueData {
                delete(s.uniqueData, k)
        }
@@ -488,6 +513,13 @@ func (s *sortedMIterator) loadDps() {
                }
        }
        for _, v := range s.uniqueData {
+               if s.indexMode {
+                       sid := v.GetDataPoint().GetSid()
+                       if _, dup := s.seenSids[sid]; dup {
+                               continue
+                       }
+                       s.seenSids[sid] = struct{}{}
+               }
                s.data.PushBack(v)
        }
 }
@@ -504,6 +536,7 @@ func (s *sortedMIterator) Close() error {
        s.exhausted = true
        s.data = nil
        s.uniqueData = nil
+       s.seenSids = nil
        s.cur = nil
        if s.Iterator == nil {
                return nil
diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go 
b/pkg/query/logical/measure/measure_plan_distributed_test.go
index 7468b4fd1..80b44897b 100644
--- a/pkg/query/logical/measure/measure_plan_distributed_test.go
+++ b/pkg/query/logical/measure/measure_plan_distributed_test.go
@@ -62,6 +62,99 @@ func makeComparableDP(sid uint64, seconds, nanos int64, 
version int64, sortField
        }
 }
 
+func TestSortedMIterator_IndexModeDedup(t *testing.T) {
+       // Cross-node duplicates for an IndexMode measure carry the same Sid but
+       // different per-node "last-write" timestamps. They live in different
+       // sort-field groups when sorted by time, so hashDataPoint (which keys 
on
+       // Sid+timestamp) cannot collapse them — only Sid-based cross-group 
dedup
+       // can. See the package-level comment on sortedMIterator.loadDps.
+       testCases := []struct {
+               name      string
+               data      []*comparableDataPoint
+               wantSids  []uint64
+               indexMode bool
+       }{
+               {
+                       // Sort-by-time, two nodes return the same Sid at 
different ts.
+                       // IndexMode=true → only first-seen Sid survives.
+                       name: "indexMode collapses same Sid across sort-field 
groups",
+                       data: []*comparableDataPoint{
+                               makeComparableDP(1, 1, 0, 1, 1),
+                               makeComparableDP(1, 2, 0, 1, 2),
+                       },
+                       indexMode: true,
+                       wantSids:  []uint64{1},
+               },
+               {
+                       // Same input, IndexMode=false → existing behavior 
preserved.
+                       name: "row-path keeps Sid duplicates with different 
timestamps",
+                       data: []*comparableDataPoint{
+                               makeComparableDP(1, 1, 0, 1, 1),
+                               makeComparableDP(1, 2, 0, 1, 2),
+                       },
+                       indexMode: false,
+                       wantSids:  []uint64{1, 1},
+               },
+               {
+                       // Sort-by-tag (same sort field) — both modes already 
dedup via
+                       // hashDataPoint here. Verify IndexMode does not 
regress that.
+                       name: "indexMode collapses same Sid within one 
sort-field group",
+                       data: []*comparableDataPoint{
+                               makeComparableDP(2, 1, 0, 1, 5),
+                               makeComparableDP(2, 2, 0, 1, 5),
+                       },
+                       indexMode: true,
+                       wantSids:  []uint64{2},
+               },
+               {
+                       // Three Sids interleaved across two sort-field groups.
+                       // IndexMode=true keeps one per Sid (first-seen).
+                       name: "indexMode preserves distinct Sids across groups",
+                       data: []*comparableDataPoint{
+                               makeComparableDP(1, 1, 0, 1, 1),
+                               makeComparableDP(2, 1, 0, 1, 1),
+                               makeComparableDP(1, 2, 0, 1, 2),
+                               makeComparableDP(3, 2, 0, 1, 2),
+                       },
+                       indexMode: true,
+                       wantSids:  []uint64{1, 2, 3},
+               },
+               {
+                       // Group at sf=2 contains only already-seen Sids — 
loadDps's outer
+                       // loop must advance past the fully-filtered group to 
reach sf=3
+                       // instead of stopping iteration. Without that, `Next` 
returns
+                       // false after the first emission and Sid=2 is lost.
+                       name: "indexMode advances past a fully-filtered 
intermediate group",
+                       data: []*comparableDataPoint{
+                               makeComparableDP(1, 1, 0, 1, 1),
+                               makeComparableDP(1, 2, 0, 1, 2),
+                               makeComparableDP(2, 3, 0, 1, 3),
+                       },
+                       indexMode: true,
+                       wantSids:  []uint64{1, 2},
+               },
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       iter := &sortedMIterator{
+                               Iterator:  &mockIterator{data: tc.data, idx: 
-1},
+                               indexMode: tc.indexMode,
+                       }
+                       iter.init()
+                       gotSids := make([]uint64, 0, len(tc.wantSids))
+                       for iter.Next() {
+                               gotSids = append(gotSids, 
iter.Current()[0].GetDataPoint().GetSid())
+                       }
+                       slices.Sort(gotSids)
+                       wantSids := append([]uint64(nil), tc.wantSids...)
+                       slices.Sort(wantSids)
+                       if diff := cmp.Diff(wantSids, gotSids); diff != "" {
+                               t.Errorf("Sids mismatch (-want +got):\n%s", 
diff)
+                       }
+               })
+       }
+}
+
 func TestSortedMIterator(t *testing.T) {
        testCases := []struct {
                name string
diff --git a/pkg/query/logical/measure/schema.go 
b/pkg/query/logical/measure/schema.go
index 2b7fbd86a..52bbff785 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -140,6 +140,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, 
error) {
 
        var commonSchemas []*logical.CommonSchema
        var tagFamilies []*databasev1.TagFamilySpec
+       var canonicalMeasure *databasev1.Measure
        fieldMap := make(map[string]*logical.FieldSpec)
 
        for _, sm := range schemas {
@@ -153,6 +154,10 @@ func mergeSchema(schemas []logical.Schema) 
(logical.Schema, error) {
 
                tagFamilies = logical.MergeTagFamilySpecs(tagFamilies, 
mSchema.measure.GetTagFamilies())
 
+               if canonicalMeasure == nil {
+                       canonicalMeasure = mSchema.measure
+               }
+
                commonSchemas = append(commonSchemas, mSchema.common)
 
                for name, spec := range mSchema.fieldMap {
@@ -184,6 +189,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, 
error) {
        }
 
        ret := &schema{
+               measure:  canonicalMeasure,
                common:   mergedCommon,
                children: schemas,
                fieldMap: fieldMap,
diff --git a/pkg/query/logical/measure/schema_test.go 
b/pkg/query/logical/measure/schema_test.go
new file mode 100644
index 000000000..8fc8fe173
--- /dev/null
+++ b/pkg/query/logical/measure/schema_test.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied.  See the License for the specific language governing permissions
+// and limitations under the License.
+
+package measure
+
+import (
+       "testing"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+// mergeSchema is called from DistributedAnalyze whenever a measure query
+// spans more than one group. The merged schema must keep enough of the
+// underlying *databasev1.Measure that downstream consumers (notably the
+// IndexMode check in unresolvedDistributed.Analyze) do not nil-deref.
+func TestMergeSchema_PropagatesMeasure(t *testing.T) {
+       build := func(group string, indexMode bool) *schema {
+               md := &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Name: 
"zipkin_service_traffic_minute", Group: group},
+                       Entity:   &databasev1.Entity{TagNames: []string{"id"}},
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {Name: "storage-only", Tags: 
[]*databasev1.TagSpec{{Name: "id", Type: databasev1.TagType_TAG_TYPE_STRING}}},
+                       },
+                       IndexMode: indexMode,
+               }
+               s, err := BuildSchema(md, nil)
+               if err != nil {
+                       t.Fatalf("BuildSchema(%s) failed: %v", group, err)
+               }
+               return s.(*schema)
+       }
+
+       t.Run("multi-group merge carries IndexMode=true", func(t *testing.T) {
+               a := build("sw_metadata", true)
+               b := build("sw_metadata_replica", true)
+               merged, err := mergeSchema([]logical.Schema{a, b})
+               if err != nil {
+                       t.Fatalf("mergeSchema: %v", err)
+               }
+               ms := merged.(*schema)
+               if ms.measure == nil {
+                       t.Fatal("merged schema lost its *databasev1.Measure; 
downstream IndexMode checks will nil-deref")
+               }
+               if !ms.measure.IndexMode {
+                       t.Fatalf("merged IndexMode = false, want true")
+               }
+       })
+
+       t.Run("multi-group merge carries IndexMode=false", func(t *testing.T) {
+               a := build("sw_metricsMinute", false)
+               b := build("sw_metricsMinute_replica", false)
+               merged, err := mergeSchema([]logical.Schema{a, b})
+               if err != nil {
+                       t.Fatalf("mergeSchema: %v", err)
+               }
+               ms := merged.(*schema)
+               if ms.measure == nil {
+                       t.Fatal("merged schema lost its *databasev1.Measure")
+               }
+               if ms.measure.IndexMode {
+                       t.Fatalf("merged IndexMode = true, want false")
+               }
+       })
+
+       t.Run("single-group fast path returns input unchanged", func(t 
*testing.T) {
+               a := build("sw_metadata", true)
+               merged, err := mergeSchema([]logical.Schema{a})
+               if err != nil {
+                       t.Fatalf("mergeSchema: %v", err)
+               }
+               if merged.(*schema) != a {
+                       t.Fatal("single-group mergeSchema returned a new 
instance; expected pass-through")
+               }
+       })
+}

Reply via email to