This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch distributed-duplicated in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b05fcad1fb2d0c4b3d2e94e3d19963d6a92f0524 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jun 18 09:31:33 2024 +0800 Deduplicate data points in distributed query Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + api/proto/banyandb/measure/v1/query.proto | 7 +- docs/api-reference.md | 3 +- .../logical/measure/measure_plan_distributed.go | 93 +++++++++++- .../measure/measure_plan_distributed_test.go | 160 +++++++++++++++++++++ .../measure/measure_plan_indexscan_local.go | 1 + test/cases/measure/data/data.go | 2 + 7 files changed, 261 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7fbff6b3..89623919 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ Release Notes. - Fix the filtering of stream in descending order by timestamp. - Fix querying old data points when the data is in a newer part. A version column is introduced to each data point and stored in the timestamp file. +- Fix the bug that duplicated data points from different data nodes are returned. ## 0.6.1 diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto index 49ef6a62..11f30f68 100644 --- a/api/proto/banyandb/measure/v1/query.proto +++ b/api/proto/banyandb/measure/v1/query.proto @@ -40,8 +40,11 @@ message DataPoint { } // fields contains fields selected in the projection repeated Field fields = 3; - // version is the version of the data point - int64 version = 4; + // sid is the series id of the data point + uint64 sid = 4; + // version is the version of the data point in a series + // sid, timestamp and version are used to identify a data point + int64 version = 5; } // QueryResponse is the response for a query to the Query module. diff --git a/docs/api-reference.md b/docs/api-reference.md index 56e5cbcc..2c2522f1 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -2364,7 +2364,8 @@ DataPoint is stored in Measures | timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. | | tag_families | [banyandb.model.v1.TagFamily](#banyandb-model-v1-TagFamily) | repeated | tag_families contains tags selected in the projection | | fields | [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field) | repeated | fields contains fields selected in the projection | -| version | [int64](#int64) | | version is the version of the data point | +| sid | [uint64](#uint64) | | sid is the series id of the data point | +| version | [int64](#int64) | | version is the version of the data point in a series sid, timestamp and version are used to identify a data point | diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 781bbd4c..31226984 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -18,6 +18,8 @@ package measure import ( + "bytes" + "container/list" "context" "fmt" "time" @@ -179,9 +181,11 @@ func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, erro t.sortByTime, t.sortTagSpec)) } } - return &sortedMIterator{ + smi := &sortedMIterator{ Iterator: sort.NewItemIter(see, t.desc), - }, allErr + } + smi.init() + return smi, allErr } func (t *distributedPlan) String() string { @@ -278,8 +282,91 @@ var _ executor.MIterator = (*sortedMIterator)(nil) type sortedMIterator struct { sort.Iterator[*comparableDataPoint] + data *list.List + uniqueData map[uint64]*measurev1.DataPoint + cur *measurev1.DataPoint + initialized bool + closed bool +} + +func (s *sortedMIterator) init() { + if s.initialized { + return + } + s.initialized = true + if !s.Iterator.Next() { + s.closed = true + return + } + s.data = list.New() + s.uniqueData = make(map[uint64]*measurev1.DataPoint) + s.loadDps() +} + +func (s *sortedMIterator) Next() bool { + if s.data == nil { + return false + } + if s.data.Len() == 0 { + s.loadDps() + if s.data.Len() == 0 { + return false + } + } + dp := s.data.Front() + s.data.Remove(dp) + s.cur = dp.Value.(*measurev1.DataPoint) + return true +} + +func (s *sortedMIterator) loadDps() { + if s.closed { + return + } + for k := range s.uniqueData { + delete(s.uniqueData, k) + } + first := s.Iterator.Val() + s.uniqueData[hashDataPoint(first.DataPoint)] = first.DataPoint + for { + if !s.Iterator.Next() { + s.closed = true + break + } + v := s.Iterator.Val() + if bytes.Equal(first.SortedField(), v.SortedField()) { + key := hashDataPoint(v.DataPoint) + if existed, ok := s.uniqueData[key]; ok { + if v.DataPoint.Version > existed.Version { + s.uniqueData[key] = v.DataPoint + } + } else { + s.uniqueData[key] = v.DataPoint + } + } else { + break + } + } + for _, v := range s.uniqueData { + s.data.PushBack(v) + } } func (s *sortedMIterator) Current() []*measurev1.DataPoint { - return []*measurev1.DataPoint{s.Val().DataPoint} + return []*measurev1.DataPoint{s.cur} +} + +const ( + offset64 = 14695981039346656037 + prime64 = 1099511628211 +) + +// hashDataPoint calculates the hash value of a data point with fnv64a. +// https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function +func hashDataPoint(dp *measurev1.DataPoint) uint64 { + h := uint64(offset64) + h = (h ^ dp.Sid) * prime64 + h = (h ^ uint64(dp.Timestamp.Seconds)) * prime64 + h = (h ^ uint64(dp.Timestamp.Nanos)) * prime64 + return h } diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go b/pkg/query/logical/measure/measure_plan_distributed_test.go new file mode 100644 index 00000000..e8e1dfb2 --- /dev/null +++ b/pkg/query/logical/measure/measure_plan_distributed_test.go @@ -0,0 +1,160 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "slices" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" +) + +type mockIterator struct { + data []*comparableDataPoint + idx int +} + +func (m *mockIterator) Next() bool { + m.idx++ + return m.idx < len(m.data) +} + +func (m *mockIterator) Val() *comparableDataPoint { + return m.data[m.idx] +} + +func (m *mockIterator) Close() error { + return nil +} + +func TestSortedMIterator(t *testing.T) { + testCases := []struct { + name string + data []*comparableDataPoint + want []*measurev1.DataPoint + }{ + { + name: "empty data", + data: []*comparableDataPoint{}, + want: []*measurev1.DataPoint{}, + }, + { + name: "all data points are the same", + data: []*comparableDataPoint{ + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + }, + want: []*measurev1.DataPoint{ + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, + }, + }, + { + name: "identical data points with different sort fields", + data: []*comparableDataPoint{ + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{2}}, + }, + want: []*measurev1.DataPoint{ + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, + }, + }, + { + name: "different data points with different sort fields", + data: []*comparableDataPoint{ + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, sortField: []byte{2}}, + }, + want: []*measurev1.DataPoint{ + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, + }, + }, + { + name: "identical data points with different versions", + data: []*comparableDataPoint{ + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, sortField: []byte{1}}, + }, + want: []*measurev1.DataPoint{ + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, + }, + }, + { + name: "identical data points with different versions", + data: []*comparableDataPoint{ + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 2, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, sortField: []byte{1}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, sortField: []byte{2}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, sortField: []byte{2}}, + {DataPoint: &measurev1.DataPoint{Sid: 2, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, sortField: []byte{2}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 1}, sortField: []byte{3}}, + {DataPoint: &measurev1.DataPoint{Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, sortField: []byte{3}}, + {DataPoint: &measurev1.DataPoint{Sid: 3, Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, sortField: []byte{3}}, + }, + want: []*measurev1.DataPoint{ + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, + {Sid: 1, Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, + {Sid: 2, Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, + {Sid: 2, Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, + {Sid: 3, Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + iter := &sortedMIterator{ + Iterator: &mockIterator{data: tc.data, idx: -1}, + } + + iter.init() + + got := make([]*measurev1.DataPoint, 0) + for iter.Next() { + got = append(got, iter.Current()[0]) + } + + slices.SortFunc(got, func(a, b *measurev1.DataPoint) int { + if diff := a.Sid - b.Sid; diff != 0 { + return int(diff) + } + if diff := a.Timestamp.Seconds - b.Timestamp.Seconds; diff != 0 { + return int(diff) + } + if diff := a.Timestamp.Nanos - b.Timestamp.Nanos; diff != 0 { + return int(diff) + } + return int(a.Version - b.Version) + }) + + if diff := cmp.Diff(tc.want, got, + protocmp.Transform(), protocmp.IgnoreUnknown()); diff != "" { + t.Errorf("sortedMIterator mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index 4079f79a..c5de0da2 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -223,6 +223,7 @@ func (ei *resultMIterator) Next() bool { for i := range r.Timestamps { dp := &measurev1.DataPoint{ Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), + Sid: uint64(r.SID), Version: r.Versions[i], } diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 347d6b99..3077c510 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -78,12 +78,14 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args for i := range resp.DataPoints { if resp.DataPoints[i].Timestamp != nil { innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0)) + innerGm.Expect(resp.DataPoints[i].Sid).Should(gm.BeNumerically(">", 0)) } } innerGm.Expect(cmp.Equal(resp, want, protocmp.IgnoreUnknown(), protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"), protocmp.IgnoreFields(&measurev1.DataPoint{}, "version"), + protocmp.IgnoreFields(&measurev1.DataPoint{}, "sid"), protocmp.Transform())). To(gm.BeTrue(), func() string { j, err := protojson.Marshal(resp)
