This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4320e7cee fix(query): dedup IndexMode measures by series ID across
data nodes (#1132)
4320e7cee is described below
commit 4320e7ceea42abc12540bd7a9c658d0059cbba95
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat May 16 11:17:43 2026 +0800
fix(query): dedup IndexMode measures by series ID across data nodes (#1132)
---
.../logical/measure/measure_plan_distributed.go | 39 ++++++++-
.../measure/measure_plan_distributed_test.go | 93 ++++++++++++++++++++++
pkg/query/logical/measure/schema.go | 6 ++
pkg/query/logical/measure/schema_test.go | 90 +++++++++++++++++++++
4 files changed, 225 insertions(+), 3 deletions(-)
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 364d75392..0b7a3734d 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -119,6 +119,7 @@ func newUnresolvedDistributed(query
*measurev1.QueryRequest, pushDownAgg bool) l
}
func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan,
error) {
+ indexMode := s.(*schema).measure.IndexMode
projectionTags := logical.ToTags(ud.originalQuery.GetTagProjection())
if len(projectionTags) > 0 {
var err error
@@ -181,6 +182,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortTagSpec: *sortTagSpec,
pushDownAgg: ud.pushDownAgg,
groupByTagsRefs: groupByTagsRefs,
+ indexMode: indexMode,
}
if ud.originalQuery.OrderBy != nil &&
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -194,6 +196,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortByTime: true,
pushDownAgg: ud.pushDownAgg,
groupByTagsRefs: groupByTagsRefs,
+ indexMode: indexMode,
}, nil
}
if ud.originalQuery.OrderBy.IndexRuleName == "" {
@@ -203,6 +206,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortByTime: true,
pushDownAgg: ud.pushDownAgg,
groupByTagsRefs: groupByTagsRefs,
+ indexMode: indexMode,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -227,6 +231,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortTagSpec: *sortTagSpec,
pushDownAgg: ud.pushDownAgg,
groupByTagsRefs: groupByTagsRefs,
+ indexMode: indexMode,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -243,6 +248,7 @@ type distributedPlan struct {
sortByTime bool
desc bool
pushDownAgg bool
+ indexMode bool
}
func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator,
err error) {
@@ -314,7 +320,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
return &pushedDownAggregatedIterator{dataPoints:
deduplicatedDps}, err
}
smi := &sortedMIterator{
- Iterator: sort.NewItemIter(see, t.desc),
+ Iterator: sort.NewItemIter(see, t.desc),
+ indexMode: t.indexMode,
}
smi.init()
return smi, err
@@ -424,9 +431,11 @@ type sortedMIterator struct {
data *list.List
uniqueData map[uint64]*measurev1.InternalDataPoint
cur *measurev1.InternalDataPoint
+ seenSids map[uint64]struct{}
initialized bool
exhausted bool
closed bool
+ indexMode bool
}
func (s *sortedMIterator) init() {
@@ -440,6 +449,9 @@ func (s *sortedMIterator) init() {
}
s.data = list.New()
s.uniqueData = make(map[uint64]*measurev1.InternalDataPoint)
+ if s.indexMode {
+ s.seenSids = make(map[uint64]struct{})
+ }
s.loadDps()
}
@@ -459,10 +471,23 @@ func (s *sortedMIterator) Next() bool {
return true
}
+// loadDps consumes sort-field-equal groups from the underlying iterator and
+// pushes their deduplicated rows into s.data. When indexMode is true, rows
+// whose Sid was already emitted in a prior group are skipped — without this,
+// cross-node duplicates that carry different per-node "last-write" timestamps
+// would slip past hashDataPoint (which keys on Sid+timestamp) because they
+// land in different sort-field groups. When every row in a group is filtered,
+// the loop advances to the next group so callers see continued iteration.
func (s *sortedMIterator) loadDps() {
- if s.exhausted {
- return
+ for !s.exhausted {
+ s.loadOneGroup()
+ if s.data.Len() > 0 {
+ return
+ }
}
+}
+
+func (s *sortedMIterator) loadOneGroup() {
for k := range s.uniqueData {
delete(s.uniqueData, k)
}
@@ -488,6 +513,13 @@ func (s *sortedMIterator) loadDps() {
}
}
for _, v := range s.uniqueData {
+ if s.indexMode {
+ sid := v.GetDataPoint().GetSid()
+ if _, dup := s.seenSids[sid]; dup {
+ continue
+ }
+ s.seenSids[sid] = struct{}{}
+ }
s.data.PushBack(v)
}
}
@@ -504,6 +536,7 @@ func (s *sortedMIterator) Close() error {
s.exhausted = true
s.data = nil
s.uniqueData = nil
+ s.seenSids = nil
s.cur = nil
if s.Iterator == nil {
return nil
diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go
b/pkg/query/logical/measure/measure_plan_distributed_test.go
index 7468b4fd1..80b44897b 100644
--- a/pkg/query/logical/measure/measure_plan_distributed_test.go
+++ b/pkg/query/logical/measure/measure_plan_distributed_test.go
@@ -62,6 +62,99 @@ func makeComparableDP(sid uint64, seconds, nanos int64,
version int64, sortField
}
}
+func TestSortedMIterator_IndexModeDedup(t *testing.T) {
+ // Cross-node duplicates for an IndexMode measure carry the same Sid but
+ // different per-node "last-write" timestamps. They live in different
+ // sort-field groups when sorted by time, so hashDataPoint (which keys
on
+ // Sid+timestamp) cannot collapse them — only Sid-based cross-group
dedup
+ // can. See the package-level comment on sortedMIterator.loadDps.
+ testCases := []struct {
+ name string
+ data []*comparableDataPoint
+ wantSids []uint64
+ indexMode bool
+ }{
+ {
+ // Sort-by-time, two nodes return the same Sid at
different ts.
+ // IndexMode=true → only first-seen Sid survives.
+ name: "indexMode collapses same Sid across sort-field
groups",
+ data: []*comparableDataPoint{
+ makeComparableDP(1, 1, 0, 1, 1),
+ makeComparableDP(1, 2, 0, 1, 2),
+ },
+ indexMode: true,
+ wantSids: []uint64{1},
+ },
+ {
+ // Same input, IndexMode=false → existing behavior
preserved.
+ name: "row-path keeps Sid duplicates with different
timestamps",
+ data: []*comparableDataPoint{
+ makeComparableDP(1, 1, 0, 1, 1),
+ makeComparableDP(1, 2, 0, 1, 2),
+ },
+ indexMode: false,
+ wantSids: []uint64{1, 1},
+ },
+ {
+ // Sort-by-tag (same sort field) — both modes already
dedup via
+ // hashDataPoint here. Verify IndexMode does not
regress that.
+ name: "indexMode collapses same Sid within one
sort-field group",
+ data: []*comparableDataPoint{
+ makeComparableDP(2, 1, 0, 1, 5),
+ makeComparableDP(2, 2, 0, 1, 5),
+ },
+ indexMode: true,
+ wantSids: []uint64{2},
+ },
+ {
+ // Three Sids interleaved across two sort-field groups.
+ // IndexMode=true keeps one per Sid (first-seen).
+ name: "indexMode preserves distinct Sids across groups",
+ data: []*comparableDataPoint{
+ makeComparableDP(1, 1, 0, 1, 1),
+ makeComparableDP(2, 1, 0, 1, 1),
+ makeComparableDP(1, 2, 0, 1, 2),
+ makeComparableDP(3, 2, 0, 1, 2),
+ },
+ indexMode: true,
+ wantSids: []uint64{1, 2, 3},
+ },
+ {
+ // Group at sf=2 contains only already-seen Sids —
loadDps's outer
+ // loop must advance past the fully-filtered group to
reach sf=3
+ // instead of stopping iteration. Without that, `Next`
returns
+ // false after the first emission and Sid=2 is lost.
+ name: "indexMode advances past a fully-filtered
intermediate group",
+ data: []*comparableDataPoint{
+ makeComparableDP(1, 1, 0, 1, 1),
+ makeComparableDP(1, 2, 0, 1, 2),
+ makeComparableDP(2, 3, 0, 1, 3),
+ },
+ indexMode: true,
+ wantSids: []uint64{1, 2},
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ iter := &sortedMIterator{
+ Iterator: &mockIterator{data: tc.data, idx:
-1},
+ indexMode: tc.indexMode,
+ }
+ iter.init()
+ gotSids := make([]uint64, 0, len(tc.wantSids))
+ for iter.Next() {
+ gotSids = append(gotSids,
iter.Current()[0].GetDataPoint().GetSid())
+ }
+ slices.Sort(gotSids)
+ wantSids := append([]uint64(nil), tc.wantSids...)
+ slices.Sort(wantSids)
+ if diff := cmp.Diff(wantSids, gotSids); diff != "" {
+ t.Errorf("Sids mismatch (-want +got):\n%s",
diff)
+ }
+ })
+ }
+}
+
func TestSortedMIterator(t *testing.T) {
testCases := []struct {
name string
diff --git a/pkg/query/logical/measure/schema.go
b/pkg/query/logical/measure/schema.go
index 2b7fbd86a..52bbff785 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -140,6 +140,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema,
error) {
var commonSchemas []*logical.CommonSchema
var tagFamilies []*databasev1.TagFamilySpec
+ var canonicalMeasure *databasev1.Measure
fieldMap := make(map[string]*logical.FieldSpec)
for _, sm := range schemas {
@@ -153,6 +154,10 @@ func mergeSchema(schemas []logical.Schema)
(logical.Schema, error) {
tagFamilies = logical.MergeTagFamilySpecs(tagFamilies,
mSchema.measure.GetTagFamilies())
+ if canonicalMeasure == nil {
+ canonicalMeasure = mSchema.measure
+ }
+
commonSchemas = append(commonSchemas, mSchema.common)
for name, spec := range mSchema.fieldMap {
@@ -184,6 +189,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema,
error) {
}
ret := &schema{
+ measure: canonicalMeasure,
common: mergedCommon,
children: schemas,
fieldMap: fieldMap,
diff --git a/pkg/query/logical/measure/schema_test.go
b/pkg/query/logical/measure/schema_test.go
new file mode 100644
index 000000000..8fc8fe173
--- /dev/null
+++ b/pkg/query/logical/measure/schema_test.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+package measure
+
+import (
+ "testing"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+// mergeSchema is called from DistributedAnalyze whenever a measure query
+// spans more than one group. The merged schema must keep enough of the
+// underlying *databasev1.Measure that downstream consumers (notably the
+// IndexMode check in unresolvedDistributed.Analyze) do not nil-deref.
+func TestMergeSchema_PropagatesMeasure(t *testing.T) {
+ build := func(group string, indexMode bool) *schema {
+ md := &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Name:
"zipkin_service_traffic_minute", Group: group},
+ Entity: &databasev1.Entity{TagNames: []string{"id"}},
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {Name: "storage-only", Tags:
[]*databasev1.TagSpec{{Name: "id", Type: databasev1.TagType_TAG_TYPE_STRING}}},
+ },
+ IndexMode: indexMode,
+ }
+ s, err := BuildSchema(md, nil)
+ if err != nil {
+ t.Fatalf("BuildSchema(%s) failed: %v", group, err)
+ }
+ return s.(*schema)
+ }
+
+ t.Run("multi-group merge carries IndexMode=true", func(t *testing.T) {
+ a := build("sw_metadata", true)
+ b := build("sw_metadata_replica", true)
+ merged, err := mergeSchema([]logical.Schema{a, b})
+ if err != nil {
+ t.Fatalf("mergeSchema: %v", err)
+ }
+ ms := merged.(*schema)
+ if ms.measure == nil {
+ t.Fatal("merged schema lost its *databasev1.Measure;
downstream IndexMode checks will nil-deref")
+ }
+ if !ms.measure.IndexMode {
+ t.Fatalf("merged IndexMode = false, want true")
+ }
+ })
+
+ t.Run("multi-group merge carries IndexMode=false", func(t *testing.T) {
+ a := build("sw_metricsMinute", false)
+ b := build("sw_metricsMinute_replica", false)
+ merged, err := mergeSchema([]logical.Schema{a, b})
+ if err != nil {
+ t.Fatalf("mergeSchema: %v", err)
+ }
+ ms := merged.(*schema)
+ if ms.measure == nil {
+ t.Fatal("merged schema lost its *databasev1.Measure")
+ }
+ if ms.measure.IndexMode {
+ t.Fatalf("merged IndexMode = true, want false")
+ }
+ })
+
+ t.Run("single-group fast path returns input unchanged", func(t
*testing.T) {
+ a := build("sw_metadata", true)
+ merged, err := mergeSchema([]logical.Schema{a})
+ if err != nil {
+ t.Fatalf("mergeSchema: %v", err)
+ }
+ if merged.(*schema) != a {
+ t.Fatal("single-group mergeSchema returned a new
instance; expected pass-through")
+ }
+ })
+}