This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0145fba3 Fix series match crash (#499)
0145fba3 is described below
commit 0145fba320a1a1eca77cba25dac52301407e28f5
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jul 26 22:02:23 2024 +0800
Fix series match crash (#499)
---
banyand/internal/storage/index.go | 18 ++++++++++--------
banyand/measure/query.go | 10 +++++++++-
banyand/stream/query.go | 14 +++++++++-----
bydbctl/internal/cmd/measure_test.go | 4 +++-
bydbctl/internal/cmd/stream_test.go | 4 +++-
pkg/pb/v1/series.go | 7 +++++++
6 files changed, 41 insertions(+), 16 deletions(-)
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index e86c8dd2..5abb9e91 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -111,8 +111,10 @@ var emptySeriesMatcher = index.SeriesMatcher{}
func convertEntityValuesToSeriesMatcher(series *pbv1.Series)
(index.SeriesMatcher, error) {
var hasAny, hasWildcard bool
var prefixIndex int
+ var localSeries pbv1.Series
+ series.CopyTo(&localSeries)
- for i, tv := range series.EntityValues {
+ for i, tv := range localSeries.EntityValues {
if tv == nil {
return emptySeriesMatcher, errors.New("unexpected nil
tag value")
}
@@ -133,29 +135,29 @@ func convertEntityValuesToSeriesMatcher(series
*pbv1.Series) (index.SeriesMatche
if hasAny {
if hasWildcard {
- if err = series.MarshalWithWildcard(); err != nil {
+ if err = localSeries.MarshalWithWildcard(); err != nil {
return emptySeriesMatcher, err
}
return index.SeriesMatcher{
Type: index.SeriesMatcherTypeWildcard,
- Match: series.Buffer,
+ Match: localSeries.Buffer,
}, nil
}
- series.EntityValues = series.EntityValues[:prefixIndex]
- if err = series.Marshal(); err != nil {
+ localSeries.EntityValues =
localSeries.EntityValues[:prefixIndex]
+ if err = localSeries.Marshal(); err != nil {
return emptySeriesMatcher, err
}
return index.SeriesMatcher{
Type: index.SeriesMatcherTypePrefix,
- Match: series.Buffer,
+ Match: localSeries.Buffer,
}, nil
}
- if err = series.Marshal(); err != nil {
+ if err = localSeries.Marshal(); err != nil {
return emptySeriesMatcher, err
}
return index.SeriesMatcher{
Type: index.SeriesMatcherTypeExact,
- Match: series.Buffer,
+ Match: localSeries.Buffer,
}, nil
}
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 882b2d34..19645d68 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -32,6 +32,7 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -149,13 +150,20 @@ func (s *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series,
mqo model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
) (sl pbv1.SeriesList, tables []*tsTable, err error) {
+ seriesFilter := roaring.NewPostingList()
for i := range segments {
tables = append(tables, segments[i].Tables()...)
sll, err := segments[i].IndexDB().Search(ctx, series,
mqo.Filter, mqo.Order, preloadSize)
if err != nil {
return nil, nil, err
}
- sl = append(sl, sll...)
+ for j := range sll {
+ if seriesFilter.Contains(uint64(sll[j].ID)) {
+ continue
+ }
+ sl = append(sl, sll[j])
+ seriesFilter.Insert(uint64(sll[j].ID))
+ }
}
return sl, tables, nil
}
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 5892187a..b70ce0b4 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -63,8 +63,6 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
result.Release()
}
}()
- var tables []*tsTable
-
series := make([]*pbv1.Series, len(sqo.Entities))
for i := range sqo.Entities {
series[i] = &pbv1.Series{
@@ -73,14 +71,20 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
}
}
var seriesList, sl pbv1.SeriesList
+ seriesFilter := roaring.NewPostingList()
for i := range result.segments {
- tables = append(tables, result.segments[i].Tables()...)
sl, err = result.segments[i].Lookup(ctx, series)
if err != nil {
return nil, err
}
- seriesList = append(seriesList, sl...)
- result.tabs = append(result.tabs, tables...)
+ for j := range sl {
+ if seriesFilter.Contains(uint64(sl[j].ID)) {
+ continue
+ }
+ seriesList = append(seriesList, sl[j])
+ seriesFilter.Insert(uint64(sl[j].ID))
+ }
+ result.tabs = append(result.tabs,
result.segments[i].Tables()...)
}
if len(seriesList) == 0 {
diff --git a/bydbctl/internal/cmd/measure_test.go
b/bydbctl/internal/cmd/measure_test.go
index 34706b2b..0f0e65ca 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -193,7 +193,9 @@ var _ = Describe("Measure Data Query", func() {
var startStr, endStr string
var interval time.Duration
BeforeEach(func() {
- now = timestamp.NowMilli()
+ var err error
+ now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
+ Expect(err).NotTo(HaveOccurred())
startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
interval = 1 * time.Millisecond
endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
index e156d48d..f413e1bc 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -194,7 +194,9 @@ var _ = Describe("Stream Data Query", func() {
var nowStr, endStr string
var interval time.Duration
BeforeEach(func() {
- now = timestamp.NowMilli()
+ var err error
+ now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
+ Expect(err).NotTo(HaveOccurred())
nowStr = now.Format(time.RFC3339)
interval = 500 * time.Millisecond
endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index f6d47397..754d155e 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -38,6 +38,13 @@ type Series struct {
ID common.SeriesID
}
+// CopyTo copies the content of the series to the destination series.
+func (s *Series) CopyTo(dst *Series) {
+ dst.Subject = s.Subject
+ dst.EntityValues = make([]*modelv1.TagValue, len(s.EntityValues))
+ copy(dst.EntityValues, s.EntityValues)
+}
+
// Marshal encodes series to internal Buffer and generates ID.
func (s *Series) Marshal() error {
s.Buffer = marshalEntityValue(s.Buffer,
convert.StringToBytes(s.Subject))