This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 07948326 Fix the index mode failure on multi-segments (#558)
07948326 is described below
commit 079483262016e8b2ac58c3f115f974d8ad7e9d2e
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Nov 16 21:37:44 2024 +0800
Fix the index mode failure on multi-segments (#558)
---
banyand/internal/storage/index.go | 3 +
banyand/measure/query.go | 38 ++++-
banyand/stream/query.go | 4 +-
test/cases/init.go | 1 +
test/cases/measure/data/input/index_mode_none.yaml | 30 ++++
.../data/testdata/service_traffic_data_old.json | 154 +++++++++++++++++++++
.../measure/data/want/index_mode_all_segs.yaml | 137 ++++++++++++++++++
test/cases/measure/measure.go | 2 +
8 files changed, 364 insertions(+), 5 deletions(-)
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index e25ad7ae..45432c99 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -110,6 +110,9 @@ func (s *seriesIndex) filter(ctx context.Context, series
[]*pbv1.Series,
if err != nil {
return nil, nil, nil, err
}
+ if len(ss) == 0 {
+ return nil, nil, nil, nil
+ }
sl, fields, tss, err = convertIndexSeriesToSeriesList(ss,
len(projection) > 0)
if err != nil {
return nil, nil, nil, errors.WithMessagef(err, "failed to
convert index series to series list, matchers: %v, matched: %d",
seriesMatchers, len(ss))
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index e0d07494..6519848b 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -46,6 +46,8 @@ const (
checkDoneEvery = 128
)
+var nilResult = model.MeasureQueryResult(nil)
+
// Query allow to retrieve measure data points.
type Query interface {
LoadGroup(name string) (resourceSchema.Group, bool)
@@ -91,7 +93,7 @@ func (s *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
tsdb := db.(storage.TSDB[*tsTable, option])
segments := tsdb.SelectSegments(*mqo.TimeRange)
if len(segments) < 1 {
- return nil, nil
+ return nilResult, nil
}
if s.schema.IndexMode {
@@ -106,7 +108,7 @@ func (s *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
for i := range segments {
segments[i].DecRef()
}
- return nil, nil
+ return nilResult, nil
}
result := queryResult{
ctx: ctx,
@@ -256,7 +258,7 @@ func (s *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
func (s *measure) buildIndexQueryResult(ctx context.Context, series
[]*pbv1.Series, mqo model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
-) (*indexSortResult, error) {
+) (model.MeasureQueryResult, error) {
defer func() {
for i := range segments {
segments[i].DecRef()
@@ -300,7 +302,7 @@ func (s *measure) buildIndexQueryResult(ctx
context.Context, series []*pbv1.Seri
PreloadSize: preloadSize,
Projection: indexProjection,
}
-
+ seriesFilter := roaring.NewPostingList()
for i := range segments {
if mqo.TimeRange.Include(segments[i].GetTimeRange()) {
opts.TimeRange = nil
@@ -312,8 +314,22 @@ func (s *measure) buildIndexQueryResult(ctx
context.Context, series []*pbv1.Seri
if err != nil {
return nil, err
}
+ for j := 0; j < len(sr.sll); j++ {
+ if seriesFilter.Contains(uint64(sr.sll[j].ID)) {
+ sr.remove(j)
+ j--
+ continue
+ }
+ seriesFilter.Insert(uint64(sr.sll[j].ID))
+ }
+ if len(sr.sll) < 1 {
+ continue
+ }
r.segResults = append(r.segResults, sr)
}
+ if len(r.segResults) < 1 {
+ return nilResult, nil
+ }
heap.Init(&r.segResults)
return r, nil
}
@@ -804,10 +820,24 @@ type segResult struct {
i int
}
+func (sr *segResult) remove(i int) {
+ sr.sll = append(sr.sll[:i], sr.sll[i+1:]...)
+ if sr.frl != nil {
+ sr.frl = append(sr.frl[:i], sr.frl[i+1:]...)
+ }
+ sr.timestamps = append(sr.timestamps[:i], sr.timestamps[i+1:]...)
+ if sr.sortedValues != nil {
+ sr.sortedValues = append(sr.sortedValues[:i],
sr.sortedValues[i+1:]...)
+ }
+}
+
type segResultHeap []*segResult
func (h segResultHeap) Len() int { return len(h) }
func (h segResultHeap) Less(i, j int) bool {
+ if h[i].sortedValues == nil {
+ return h[i].sll[h[i].i].ID < h[j].sll[h[j].i].ID
+ }
return bytes.Compare(h[i].sortedValues[h[i].i],
h[j].sortedValues[h[j].i]) < 0
}
func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index d8e14b28..e375ae16 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -44,6 +44,8 @@ import (
const checkDoneEvery = 128
+var nilResult = model.StreamQueryResult(nil)
+
func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr
model.StreamQueryResult, err error) {
if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
@@ -53,7 +55,7 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
}
db := s.databaseSupplier.SupplyTSDB()
if db == nil {
- return sqr, nil
+ return nilResult, nil
}
var result queryResult
tsdb := db.(storage.TSDB[*tsTable, option])
diff --git a/test/cases/init.go b/test/cases/init.go
index 7f32b753..64b60fae 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -41,6 +41,7 @@ func Initialize(addr string, now time.Time) {
casesstreamdata.Write(conn, "duplicated", now, 0)
// // measure
interval = time.Minute
+ casesmeasuredata.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data_old.json", now.AddDate(0, 0, -1), interval)
casesmeasuredata.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
diff --git a/test/cases/measure/data/input/index_mode_none.yaml
b/test/cases/measure/data/input/index_mode_none.yaml
new file mode 100644
index 00000000..0b5e5761
--- /dev/null
+++ b/test/cases/measure/data/input/index_mode_none.yaml
@@ -0,0 +1,30 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_traffic"
+groups: [ "sw_metric" ]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: [ "id", "service_id", "name", "short_name", "service_group", "layer"
]
+criteria:
+ condition:
+ name: "layer"
+ op: "BINARY_OP_EQ"
+ value:
+ int:
+ value: "-1"
diff --git a/test/cases/measure/data/testdata/service_traffic_data_old.json
b/test/cases/measure/data/testdata/service_traffic_data_old.json
new file mode 100644
index 00000000..58246fdd
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_traffic_data_old.json
@@ -0,0 +1,154 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "service_1_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_1_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_1_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "group1_expired"
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "2"
+ }
+ },
+ {
+ "str": {
+ "value": "service_2_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_2_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_2_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "group1"
+ }
+ },
+ {
+ "int": {
+ "value": 2
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "3"
+ }
+ },
+ {
+ "str": {
+ "value": "service_3_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_3_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_3_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "group1_expired"
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "4"
+ }
+ },
+ {
+ "str": {
+ "value": "service_4"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_4"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_4"
+ }
+ },
+ {
+ "str": {
+ "value": "group4"
+ }
+ },
+ {
+ "int": {
+ "value": 3
+ }
+ }
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/test/cases/measure/data/want/index_mode_all_segs.yaml
b/test/cases/measure/data/want/index_mode_all_segs.yaml
new file mode 100644
index 00000000..a10946c6
--- /dev/null
+++ b/test/cases/measure/data/want/index_mode_all_segs.yaml
@@ -0,0 +1,137 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+dataPoints:
+- sid: "15142466043926325685"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: "1"
+ - key: service_id
+ value:
+ str:
+ value: service_1
+ - key: name
+ value:
+ str:
+ value: service_name_1
+ - key: short_name
+ value:
+ str:
+ value: service_short_name_1
+ - key: service_group
+ value:
+ str:
+ value: group1
+ - key: layer
+ value:
+ int:
+ value: "1"
+ timestamp: "2024-11-16T11:17:00Z"
+ version: "1"
+- sid: "3906119849472468294"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: "2"
+ - key: service_id
+ value:
+ str:
+ value: service_2
+ - key: name
+ value:
+ str:
+ value: service_name_2
+ - key: short_name
+ value:
+ str:
+ value: service_short_name_2
+ - key: service_group
+ value:
+ str:
+ value: group1
+ - key: layer
+ value:
+ int:
+ value: "2"
+ timestamp: "2024-11-16T11:18:00Z"
+ version: "1"
+- sid: "12370392692163567533"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: "3"
+ - key: service_id
+ value:
+ str:
+ value: service_3
+ - key: name
+ value:
+ str:
+ value: service_name_3
+ - key: short_name
+ value:
+ str:
+ value: service_short_name_3
+ - key: service_group
+ value:
+ str:
+ value: group1
+ - key: layer
+ value:
+ int:
+ value: "1"
+ timestamp: "2024-11-16T11:19:00Z"
+ version: "1"
+- sid: "16450204962168035869"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: "4"
+ - key: service_id
+ value:
+ str:
+ value: service_4
+ - key: name
+ value:
+ str:
+ value: service_name_4
+ - key: short_name
+ value:
+ str:
+ value: service_short_name_4
+ - key: service_group
+ value:
+ str:
+ value: group4
+ - key: layer
+ value:
+ int:
+ value: "3"
+ timestamp: "2024-11-15T11:19:00Z"
+ version: "1"
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index bc31747d..39bd2bf3 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -73,6 +73,8 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("match a tag belongs to the entity", helpers.Args{Input:
"entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("all of index mode", helpers.Args{Input: "index_mode_all",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("all in all segments of index mode", helpers.Args{Input:
"index_mode_all", Want: "index_mode_all_segs", Duration: 72 * time.Hour,
Offset: -48 * time.Hour}),
g.Entry("order by desc of index mode", helpers.Args{Input:
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 *
time.Minute}),
g.Entry("range of index mode", helpers.Args{Input: "index_mode_range",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("none of index mode", helpers.Args{Input: "index_mode_none",
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)