This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 07948326 Fix the index mode failure on multi-segments (#558)
07948326 is described below

commit 079483262016e8b2ac58c3f115f974d8ad7e9d2e
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Nov 16 21:37:44 2024 +0800

    Fix the index mode failure on multi-segments (#558)
---
 banyand/internal/storage/index.go                  |   3 +
 banyand/measure/query.go                           |  38 ++++-
 banyand/stream/query.go                            |   4 +-
 test/cases/init.go                                 |   1 +
 test/cases/measure/data/input/index_mode_none.yaml |  30 ++++
 .../data/testdata/service_traffic_data_old.json    | 154 +++++++++++++++++++++
 .../measure/data/want/index_mode_all_segs.yaml     | 137 ++++++++++++++++++
 test/cases/measure/measure.go                      |   2 +
 8 files changed, 364 insertions(+), 5 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index e25ad7ae..45432c99 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -110,6 +110,9 @@ func (s *seriesIndex) filter(ctx context.Context, series 
[]*pbv1.Series,
        if err != nil {
                return nil, nil, nil, err
        }
+       if len(ss) == 0 {
+               return nil, nil, nil, nil
+       }
        sl, fields, tss, err = convertIndexSeriesToSeriesList(ss, 
len(projection) > 0)
        if err != nil {
                return nil, nil, nil, errors.WithMessagef(err, "failed to 
convert index series to series list, matchers: %v, matched: %d", 
seriesMatchers, len(ss))
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index e0d07494..6519848b 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -46,6 +46,8 @@ const (
        checkDoneEvery = 128
 )
 
+var nilResult = model.MeasureQueryResult(nil)
+
 // Query allow to retrieve measure data points.
 type Query interface {
        LoadGroup(name string) (resourceSchema.Group, bool)
@@ -91,7 +93,7 @@ func (s *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        tsdb := db.(storage.TSDB[*tsTable, option])
        segments := tsdb.SelectSegments(*mqo.TimeRange)
        if len(segments) < 1 {
-               return nil, nil
+               return nilResult, nil
        }
 
        if s.schema.IndexMode {
@@ -106,7 +108,7 @@ func (s *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                for i := range segments {
                        segments[i].DecRef()
                }
-               return nil, nil
+               return nilResult, nil
        }
        result := queryResult{
                ctx:              ctx,
@@ -256,7 +258,7 @@ func (s *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
 
 func (s *measure) buildIndexQueryResult(ctx context.Context, series 
[]*pbv1.Series, mqo model.MeasureQueryOptions,
        segments []storage.Segment[*tsTable, option],
-) (*indexSortResult, error) {
+) (model.MeasureQueryResult, error) {
        defer func() {
                for i := range segments {
                        segments[i].DecRef()
@@ -300,7 +302,7 @@ func (s *measure) buildIndexQueryResult(ctx 
context.Context, series []*pbv1.Seri
                PreloadSize: preloadSize,
                Projection:  indexProjection,
        }
-
+       seriesFilter := roaring.NewPostingList()
        for i := range segments {
                if mqo.TimeRange.Include(segments[i].GetTimeRange()) {
                        opts.TimeRange = nil
@@ -312,8 +314,22 @@ func (s *measure) buildIndexQueryResult(ctx 
context.Context, series []*pbv1.Seri
                if err != nil {
                        return nil, err
                }
+               for j := 0; j < len(sr.sll); j++ {
+                       if seriesFilter.Contains(uint64(sr.sll[j].ID)) {
+                               sr.remove(j)
+                               j--
+                               continue
+                       }
+                       seriesFilter.Insert(uint64(sr.sll[j].ID))
+               }
+               if len(sr.sll) < 1 {
+                       continue
+               }
                r.segResults = append(r.segResults, sr)
        }
+       if len(r.segResults) < 1 {
+               return nilResult, nil
+       }
        heap.Init(&r.segResults)
        return r, nil
 }
@@ -804,10 +820,24 @@ type segResult struct {
        i            int
 }
 
+func (sr *segResult) remove(i int) {
+       sr.sll = append(sr.sll[:i], sr.sll[i+1:]...)
+       if sr.frl != nil {
+               sr.frl = append(sr.frl[:i], sr.frl[i+1:]...)
+       }
+       sr.timestamps = append(sr.timestamps[:i], sr.timestamps[i+1:]...)
+       if sr.sortedValues != nil {
+               sr.sortedValues = append(sr.sortedValues[:i], 
sr.sortedValues[i+1:]...)
+       }
+}
+
 type segResultHeap []*segResult
 
 func (h segResultHeap) Len() int { return len(h) }
 func (h segResultHeap) Less(i, j int) bool {
+       if h[i].sortedValues == nil {
+               return h[i].sll[h[i].i].ID < h[j].sll[h[j].i].ID
+       }
        return bytes.Compare(h[i].sortedValues[h[i].i], 
h[j].sortedValues[h[j].i]) < 0
 }
 func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index d8e14b28..e375ae16 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -44,6 +44,8 @@ import (
 
 const checkDoneEvery = 128
 
+var nilResult = model.StreamQueryResult(nil)
+
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
@@ -53,7 +55,7 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        }
        db := s.databaseSupplier.SupplyTSDB()
        if db == nil {
-               return sqr, nil
+               return nilResult, nil
        }
        var result queryResult
        tsdb := db.(storage.TSDB[*tsTable, option])
diff --git a/test/cases/init.go b/test/cases/init.go
index 7f32b753..64b60fae 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -41,6 +41,7 @@ func Initialize(addr string, now time.Time) {
        casesstreamdata.Write(conn, "duplicated", now, 0)
        // // measure
        interval = time.Minute
+       casesmeasuredata.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data_old.json", now.AddDate(0, 0, -1), interval)
        casesmeasuredata.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
diff --git a/test/cases/measure/data/input/index_mode_none.yaml 
b/test/cases/measure/data/input/index_mode_none.yaml
new file mode 100644
index 00000000..0b5e5761
--- /dev/null
+++ b/test/cases/measure/data/input/index_mode_none.yaml
@@ -0,0 +1,30 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_traffic"
+groups: [ "sw_metric" ]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: [ "id", "service_id", "name", "short_name", "service_group", "layer" 
]
+criteria:
+  condition:
+    name: "layer"
+    op: "BINARY_OP_EQ"
+    value:
+      int:
+        value: "-1"
diff --git a/test/cases/measure/data/testdata/service_traffic_data_old.json 
b/test/cases/measure/data/testdata/service_traffic_data_old.json
new file mode 100644
index 00000000..58246fdd
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_traffic_data_old.json
@@ -0,0 +1,154 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "1"
+            }
+          },
+          {
+            "str": {
+              "value": "service_1_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_1_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_1_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "group1_expired"
+            }
+          },
+          {
+            "int": {
+              "value": 1
+            }
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "2"
+            }
+          },
+          {
+            "str": {
+              "value": "service_2_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_2_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_2_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "group1"
+            }
+          },
+          {
+            "int": {
+              "value": 2
+            }
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "3"
+            }
+          },
+          {
+            "str": {
+              "value": "service_3_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_3_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_3_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "group1_expired"
+            }
+          },
+          {
+            "int": {
+              "value": 1
+            }
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "4"
+            }
+          },
+          {
+            "str": {
+              "value": "service_4"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_4"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_4"
+            }
+          },
+          {
+            "str": {
+              "value": "group4"
+            }
+          },
+          {
+            "int": {
+              "value": 3
+            }
+          }
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/test/cases/measure/data/want/index_mode_all_segs.yaml 
b/test/cases/measure/data/want/index_mode_all_segs.yaml
new file mode 100644
index 00000000..a10946c6
--- /dev/null
+++ b/test/cases/measure/data/want/index_mode_all_segs.yaml
@@ -0,0 +1,137 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+dataPoints:
+- sid: "15142466043926325685"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: "1"
+    - key: service_id
+      value:
+        str:
+          value: service_1
+    - key: name
+      value:
+        str:
+          value: service_name_1
+    - key: short_name
+      value:
+        str:
+          value: service_short_name_1
+    - key: service_group
+      value:
+        str:
+          value: group1
+    - key: layer
+      value:
+        int:
+          value: "1"
+  timestamp: "2024-11-16T11:17:00Z"
+  version: "1"
+- sid: "3906119849472468294"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: "2"
+    - key: service_id
+      value:
+        str:
+          value: service_2
+    - key: name
+      value:
+        str:
+          value: service_name_2
+    - key: short_name
+      value:
+        str:
+          value: service_short_name_2
+    - key: service_group
+      value:
+        str:
+          value: group1
+    - key: layer
+      value:
+        int:
+          value: "2"
+  timestamp: "2024-11-16T11:18:00Z"
+  version: "1"
+- sid: "12370392692163567533"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: "3"
+    - key: service_id
+      value:
+        str:
+          value: service_3
+    - key: name
+      value:
+        str:
+          value: service_name_3
+    - key: short_name
+      value:
+        str:
+          value: service_short_name_3
+    - key: service_group
+      value:
+        str:
+          value: group1
+    - key: layer
+      value:
+        int:
+          value: "1"
+  timestamp: "2024-11-16T11:19:00Z"
+  version: "1"
+- sid: "16450204962168035869"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: "4"
+    - key: service_id
+      value:
+        str:
+          value: service_4
+    - key: name
+      value:
+        str:
+          value: service_name_4
+    - key: short_name
+      value:
+        str:
+          value: service_short_name_4
+    - key: service_group
+      value:
+        str:
+          value: group4
+    - key: layer
+      value:
+        int:
+          value: "3"
+  timestamp: "2024-11-15T11:19:00Z"
+  version: "1"
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index bc31747d..39bd2bf3 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -73,6 +73,8 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match a tag belongs to the entity", helpers.Args{Input: 
"entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("all in all segments of index mode", helpers.Args{Input: 
"index_mode_all", Want: "index_mode_all_segs", Duration: 72 * time.Hour, 
Offset: -48 * time.Hour}),
        g.Entry("order by desc of index mode", helpers.Args{Input: 
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * 
time.Minute}),
        g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", 
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )

Reply via email to