This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0145fba3 Fix series match crash (#499)
0145fba3 is described below

commit 0145fba320a1a1eca77cba25dac52301407e28f5
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jul 26 22:02:23 2024 +0800

    Fix series match crash (#499)
---
 banyand/internal/storage/index.go    | 18 ++++++++++--------
 banyand/measure/query.go             | 10 +++++++++-
 banyand/stream/query.go              | 14 +++++++++-----
 bydbctl/internal/cmd/measure_test.go |  4 +++-
 bydbctl/internal/cmd/stream_test.go  |  4 +++-
 pkg/pb/v1/series.go                  |  7 +++++++
 6 files changed, 41 insertions(+), 16 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index e86c8dd2..5abb9e91 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -111,8 +111,10 @@ var emptySeriesMatcher = index.SeriesMatcher{}
 func convertEntityValuesToSeriesMatcher(series *pbv1.Series) 
(index.SeriesMatcher, error) {
        var hasAny, hasWildcard bool
        var prefixIndex int
+       var localSeries pbv1.Series
+       series.CopyTo(&localSeries)
 
-       for i, tv := range series.EntityValues {
+       for i, tv := range localSeries.EntityValues {
                if tv == nil {
                        return emptySeriesMatcher, errors.New("unexpected nil 
tag value")
                }
@@ -133,29 +135,29 @@ func convertEntityValuesToSeriesMatcher(series 
*pbv1.Series) (index.SeriesMatche
 
        if hasAny {
                if hasWildcard {
-                       if err = series.MarshalWithWildcard(); err != nil {
+                       if err = localSeries.MarshalWithWildcard(); err != nil {
                                return emptySeriesMatcher, err
                        }
                        return index.SeriesMatcher{
                                Type:  index.SeriesMatcherTypeWildcard,
-                               Match: series.Buffer,
+                               Match: localSeries.Buffer,
                        }, nil
                }
-               series.EntityValues = series.EntityValues[:prefixIndex]
-               if err = series.Marshal(); err != nil {
+               localSeries.EntityValues = 
localSeries.EntityValues[:prefixIndex]
+               if err = localSeries.Marshal(); err != nil {
                        return emptySeriesMatcher, err
                }
                return index.SeriesMatcher{
                        Type:  index.SeriesMatcherTypePrefix,
-                       Match: series.Buffer,
+                       Match: localSeries.Buffer,
                }, nil
        }
-       if err = series.Marshal(); err != nil {
+       if err = localSeries.Marshal(); err != nil {
                return emptySeriesMatcher, err
        }
        return index.SeriesMatcher{
                Type:  index.SeriesMatcherTypeExact,
-               Match: series.Buffer,
+               Match: localSeries.Buffer,
        }, nil
 }
 
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 882b2d34..19645d68 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -32,6 +32,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -149,13 +150,20 @@ func (s *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
 func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, 
mqo model.MeasureQueryOptions,
        segments []storage.Segment[*tsTable, option],
 ) (sl pbv1.SeriesList, tables []*tsTable, err error) {
+       seriesFilter := roaring.NewPostingList()
        for i := range segments {
                tables = append(tables, segments[i].Tables()...)
                sll, err := segments[i].IndexDB().Search(ctx, series, 
mqo.Filter, mqo.Order, preloadSize)
                if err != nil {
                        return nil, nil, err
                }
-               sl = append(sl, sll...)
+               for j := range sll {
+                       if seriesFilter.Contains(uint64(sll[j].ID)) {
+                               continue
+                       }
+                       sl = append(sl, sll[j])
+                       seriesFilter.Insert(uint64(sll[j].ID))
+               }
        }
        return sl, tables, nil
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 5892187a..b70ce0b4 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -63,8 +63,6 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                        result.Release()
                }
        }()
-       var tables []*tsTable
-
        series := make([]*pbv1.Series, len(sqo.Entities))
        for i := range sqo.Entities {
                series[i] = &pbv1.Series{
@@ -73,14 +71,20 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                }
        }
        var seriesList, sl pbv1.SeriesList
+       seriesFilter := roaring.NewPostingList()
        for i := range result.segments {
-               tables = append(tables, result.segments[i].Tables()...)
                sl, err = result.segments[i].Lookup(ctx, series)
                if err != nil {
                        return nil, err
                }
-               seriesList = append(seriesList, sl...)
-               result.tabs = append(result.tabs, tables...)
+               for j := range sl {
+                       if seriesFilter.Contains(uint64(sl[j].ID)) {
+                               continue
+                       }
+                       seriesList = append(seriesList, sl[j])
+                       seriesFilter.Insert(uint64(sl[j].ID))
+               }
+               result.tabs = append(result.tabs, 
result.segments[i].Tables()...)
        }
 
        if len(seriesList) == 0 {
diff --git a/bydbctl/internal/cmd/measure_test.go 
b/bydbctl/internal/cmd/measure_test.go
index 34706b2b..0f0e65ca 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -193,7 +193,9 @@ var _ = Describe("Measure Data Query", func() {
        var startStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               now = timestamp.NowMilli()
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
                startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
                interval = 1 * time.Millisecond
                endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/stream_test.go 
b/bydbctl/internal/cmd/stream_test.go
index e156d48d..f413e1bc 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -194,7 +194,9 @@ var _ = Describe("Stream Data Query", func() {
        var nowStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               now = timestamp.NowMilli()
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
                nowStr = now.Format(time.RFC3339)
                interval = 500 * time.Millisecond
                endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index f6d47397..754d155e 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -38,6 +38,13 @@ type Series struct {
        ID           common.SeriesID
 }
 
+// CopyTo copies the content of the series to the destination series.
+func (s *Series) CopyTo(dst *Series) {
+       dst.Subject = s.Subject
+       dst.EntityValues = make([]*modelv1.TagValue, len(s.EntityValues))
+       copy(dst.EntityValues, s.EntityValues)
+}
+
 // Marshal encodes series to internal Buffer and generates ID.
 func (s *Series) Marshal() error {
        s.Buffer = marshalEntityValue(s.Buffer, 
convert.StringToBytes(s.Subject))

Reply via email to