This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch distributed-duplicated
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b05fcad1fb2d0c4b3d2e94e3d19963d6a92f0524
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 18 09:31:33 2024 +0800

    Deduplicate data points in distributed query
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/measure/v1/query.proto          |   7 +-
 docs/api-reference.md                              |   3 +-
 .../logical/measure/measure_plan_distributed.go    |  93 +++++++++++-
 .../measure/measure_plan_distributed_test.go       | 160 +++++++++++++++++++++
 .../measure/measure_plan_indexscan_local.go        |   1 +
 test/cases/measure/data/data.go                    |   2 +
 7 files changed, 261 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 7fbff6b3..89623919 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 
 - Fix the filtering of stream in descending order by timestamp.
 - Fix querying old data points when the data is in a newer part. A version 
column is introduced to each data point and stored in the timestamp file.
+- Fix the bug that duplicated data points from different data nodes are 
returned.
 
 ## 0.6.1
 
diff --git a/api/proto/banyandb/measure/v1/query.proto 
b/api/proto/banyandb/measure/v1/query.proto
index 49ef6a62..11f30f68 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -40,8 +40,11 @@ message DataPoint {
   }
   // fields contains fields selected in the projection
   repeated Field fields = 3;
-  // version is the version of the data point
-  int64 version = 4;
+  // sid is the series id of the data point
+  uint64 sid = 4;
+  // version is the version of the data point in a series
+  // sid, timestamp and version are used to identify a data point
+  int64 version = 5;
 }
 
 // QueryResponse is the response for a query to the Query module.
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 56e5cbcc..2c2522f1 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2364,7 +2364,8 @@ DataPoint is stored in Measures
 | timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
timestamp is in the timeunit of milliseconds. |
 | tag_families | [banyandb.model.v1.TagFamily](#banyandb-model-v1-TagFamily) | 
repeated | tag_families contains tags selected in the projection |
 | fields | [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field) | repeated 
| fields contains fields selected in the projection |
-| version | [int64](#int64) |  | version is the version of the data point |
+| sid | [uint64](#uint64) |  | sid is the series id of the data point |
+| version | [int64](#int64) |  | version is the version of the data point in a 
series sid, timestamp and version are used to identify a data point |
 
 
 
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 781bbd4c..31226984 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -18,6 +18,8 @@
 package measure
 
 import (
+       "bytes"
+       "container/list"
        "context"
        "fmt"
        "time"
@@ -179,9 +181,11 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
                                        t.sortByTime, t.sortTagSpec))
                }
        }
-       return &sortedMIterator{
+       smi := &sortedMIterator{
                Iterator: sort.NewItemIter(see, t.desc),
-       }, allErr
+       }
+       smi.init()
+       return smi, allErr
 }
 
 func (t *distributedPlan) String() string {
@@ -278,8 +282,91 @@ var _ executor.MIterator = (*sortedMIterator)(nil)
 
 type sortedMIterator struct {
        sort.Iterator[*comparableDataPoint]
+       data        *list.List
+       uniqueData  map[uint64]*measurev1.DataPoint
+       cur         *measurev1.DataPoint
+       initialized bool
+       closed      bool
+}
+
+func (s *sortedMIterator) init() {
+       if s.initialized {
+               return
+       }
+       s.initialized = true
+       if !s.Iterator.Next() {
+               s.closed = true
+               return
+       }
+       s.data = list.New()
+       s.uniqueData = make(map[uint64]*measurev1.DataPoint)
+       s.loadDps()
+}
+
+func (s *sortedMIterator) Next() bool {
+       if s.data == nil {
+               return false
+       }
+       if s.data.Len() == 0 {
+               s.loadDps()
+               if s.data.Len() == 0 {
+                       return false
+               }
+       }
+       dp := s.data.Front()
+       s.data.Remove(dp)
+       s.cur = dp.Value.(*measurev1.DataPoint)
+       return true
+}
+
+func (s *sortedMIterator) loadDps() {
+       if s.closed {
+               return
+       }
+       for k := range s.uniqueData {
+               delete(s.uniqueData, k)
+       }
+       first := s.Iterator.Val()
+       s.uniqueData[hashDataPoint(first.DataPoint)] = first.DataPoint
+       for {
+               if !s.Iterator.Next() {
+                       s.closed = true
+                       break
+               }
+               v := s.Iterator.Val()
+               if bytes.Equal(first.SortedField(), v.SortedField()) {
+                       key := hashDataPoint(v.DataPoint)
+                       if existed, ok := s.uniqueData[key]; ok {
+                               if v.DataPoint.Version > existed.Version {
+                                       s.uniqueData[key] = v.DataPoint
+                               }
+                       } else {
+                               s.uniqueData[key] = v.DataPoint
+                       }
+               } else {
+                       break
+               }
+       }
+       for _, v := range s.uniqueData {
+               s.data.PushBack(v)
+       }
 }
 
 func (s *sortedMIterator) Current() []*measurev1.DataPoint {
-       return []*measurev1.DataPoint{s.Val().DataPoint}
+       return []*measurev1.DataPoint{s.cur}
+}
+
+const (
+       offset64 = 14695981039346656037
+       prime64  = 1099511628211
+)
+
+// hashDataPoint calculates the hash value of a data point with fnv64a.
+// https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
+func hashDataPoint(dp *measurev1.DataPoint) uint64 {
+       h := uint64(offset64)
+       h = (h ^ dp.Sid) * prime64
+       h = (h ^ uint64(dp.Timestamp.Seconds)) * prime64
+       h = (h ^ uint64(dp.Timestamp.Nanos)) * prime64
+       return h
 }
diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go 
b/pkg/query/logical/measure/measure_plan_distributed_test.go
new file mode 100644
index 00000000..e8e1dfb2
--- /dev/null
+++ b/pkg/query/logical/measure/measure_plan_distributed_test.go
@@ -0,0 +1,160 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "slices"
+       "testing"
+
+       "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/testing/protocmp"
+       timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+)
+
+type mockIterator struct {
+       data []*comparableDataPoint
+       idx  int
+}
+
+func (m *mockIterator) Next() bool {
+       m.idx++
+       return m.idx < len(m.data)
+}
+
+func (m *mockIterator) Val() *comparableDataPoint {
+       return m.data[m.idx]
+}
+
+func (m *mockIterator) Close() error {
+       return nil
+}
+
+func TestSortedMIterator(t *testing.T) {
+       testCases := []struct {
+               name string
+               data []*comparableDataPoint
+               want []*measurev1.DataPoint
+       }{
+               {
+                       name: "empty data",
+                       data: []*comparableDataPoint{},
+                       want: []*measurev1.DataPoint{},
+               },
+               {
+                       name: "all data points are the same",
+                       data: []*comparableDataPoint{
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                       },
+                       want: []*measurev1.DataPoint{
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
+                       },
+               },
+               {
+                       name: "identical data points with different sort 
fields",
+                       data: []*comparableDataPoint{
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{2}},
+                       },
+                       want: []*measurev1.DataPoint{
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
+                       },
+               },
+               {
+                       name: "different data points with different sort 
fields",
+                       data: []*comparableDataPoint{
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{2}},
+                       },
+                       want: []*measurev1.DataPoint{
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1},
+                       },
+               },
+               {
+                       name: "identical data points with different versions",
+                       data: []*comparableDataPoint{
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, 
sortField: []byte{1}},
+                       },
+                       want: []*measurev1.DataPoint{
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1},
+                       },
+               },
+               {
+                       name: "identical data points with different versions",
+                       data: []*comparableDataPoint{
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 2, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, 
sortField: []byte{1}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, 
sortField: []byte{2}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{2}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 2, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, 
sortField: []byte{2}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 1}, 
sortField: []byte{3}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, 
sortField: []byte{3}},
+                               {DataPoint: &measurev1.DataPoint{Sid: 3, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, 
sortField: []byte{3}},
+                       },
+                       want: []*measurev1.DataPoint{
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2},
+                               {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2},
+                               {Sid: 2, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
+                               {Sid: 2, Timestamp: 
&timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2},
+                               {Sid: 3, Timestamp: 
&timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2},
+                       },
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       iter := &sortedMIterator{
+                               Iterator: &mockIterator{data: tc.data, idx: -1},
+                       }
+
+                       iter.init()
+
+                       got := make([]*measurev1.DataPoint, 0)
+                       for iter.Next() {
+                               got = append(got, iter.Current()[0])
+                       }
+
+                       slices.SortFunc(got, func(a, b *measurev1.DataPoint) 
int {
+                               if diff := a.Sid - b.Sid; diff != 0 {
+                                       return int(diff)
+                               }
+                               if diff := a.Timestamp.Seconds - 
b.Timestamp.Seconds; diff != 0 {
+                                       return int(diff)
+                               }
+                               if diff := a.Timestamp.Nanos - 
b.Timestamp.Nanos; diff != 0 {
+                                       return int(diff)
+                               }
+                               return int(a.Version - b.Version)
+                       })
+
+                       if diff := cmp.Diff(tc.want, got,
+                               protocmp.Transform(), 
protocmp.IgnoreUnknown()); diff != "" {
+                               t.Errorf("sortedMIterator mismatch (-want 
+got):\n%s", diff)
+                       }
+               })
+       }
+}
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 4079f79a..c5de0da2 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -223,6 +223,7 @@ func (ei *resultMIterator) Next() bool {
        for i := range r.Timestamps {
                dp := &measurev1.DataPoint{
                        Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
+                       Sid:       uint64(r.SID),
                        Version:   r.Versions[i],
                }
 
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 347d6b99..3077c510 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -78,12 +78,14 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
        for i := range resp.DataPoints {
                if resp.DataPoints[i].Timestamp != nil {
                        
innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0))
+                       
innerGm.Expect(resp.DataPoints[i].Sid).Should(gm.BeNumerically(">", 0))
                }
        }
        innerGm.Expect(cmp.Equal(resp, want,
                protocmp.IgnoreUnknown(),
                protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
                protocmp.IgnoreFields(&measurev1.DataPoint{}, "version"),
+               protocmp.IgnoreFields(&measurev1.DataPoint{}, "sid"),
                protocmp.Transform())).
                To(gm.BeTrue(), func() string {
                        j, err := protojson.Marshal(resp)

Reply via email to