This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 715674c4 Improve filtering performance of Stream (#440)
715674c4 is described below
commit 715674c40d00fbd42748eda3ebcfe7f04a2d290f
Author: Huang Youliang <[email protected]>
AuthorDate: Fri May 24 12:53:48 2024 +0800
Improve filtering performance of Stream (#440)
* Improve filtering performance of Stream
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
banyand/cmd/server/main.go | 2 +
banyand/measure/query.go | 39 ++-
banyand/stream/benchmark_test.go | 65 +---
banyand/stream/block.go | 71 ++--
banyand/stream/index.go | 3 +
banyand/stream/iter_builder.go | 22 +-
banyand/stream/query.go | 376 +++++++++++++--------
banyand/stream/stream.go | 4 +-
pkg/pb/v1/metadata.go | 26 --
pkg/query/executor/interface.go | 4 +-
.../logical/stream/stream_plan_indexscan_local.go | 14 +-
pkg/timestamp/range.go | 23 ++
test/cases/stream/data/input/order_asc.yaml | 27 ++
test/cases/stream/data/input/order_desc.yaml | 27 ++
test/cases/stream/data/want/order_asc.yaml | 103 ++++++
test/cases/stream/data/want/order_desc.yaml | 103 ++++++
test/cases/stream/stream.go | 2 +
17 files changed, 630 insertions(+), 281 deletions(-)
diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go
index 99827d13..3d7b6d77 100644
--- a/banyand/cmd/server/main.go
+++ b/banyand/cmd/server/main.go
@@ -21,12 +21,14 @@ package main
import (
"fmt"
"os"
+ "runtime"
"github.com/apache/skywalking-banyandb/pkg/cmdsetup"
"github.com/apache/skywalking-banyandb/pkg/signal"
)
func main() {
+ runtime.GOMAXPROCS(runtime.NumCPU() * 2)
if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 57bd381d..876bcb5a 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -373,21 +373,36 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
if len(qr.data) == 0 {
return nil
}
- // TODO:// Parallel load
- tmpBlock := generateBlock()
- defer releaseBlock(tmpBlock)
+
+ cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
- if !qr.data[i].loadData(tmpBlock) {
- qr.data = append(qr.data[:i], qr.data[i+1:]...)
- i--
- }
- if i < 0 {
- continue
- }
- if qr.orderByTimestampDesc() {
- qr.data[i].idx = len(qr.data[i].timestamps) - 1
+ go func(i int) {
+ tmpBlock := generateBlock()
+ defer releaseBlock(tmpBlock)
+ if !qr.data[i].loadData(tmpBlock) {
+ cursorChan <- i
+ return
+ }
+ if qr.orderByTimestampDesc() {
+ qr.data[i].idx =
len(qr.data[i].timestamps) - 1
+ }
+ cursorChan <- -1
+ }(i)
+ }
+
+ blankCursorList := []int{}
+ for completed := 0; completed < len(qr.data); completed++ {
+ result := <-cursorChan
+ if result != -1 {
+ blankCursorList = append(blankCursorList,
result)
}
}
+ sort.Slice(blankCursorList, func(i, j int) bool {
+ return blankCursorList[i] > blankCursorList[j]
+ })
+ for _, index := range blankCursorList {
+ qr.data = append(qr.data[:index], qr.data[index+1:]...)
+ }
qr.loaded = true
heap.Init(qr)
}
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index c9e8d868..4b0b4cbc 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -42,6 +42,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ logicalstream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -54,6 +55,7 @@ const (
)
type parameter struct {
+ scenario string
batchCount int
timestampCount int
seriesCount int
@@ -63,9 +65,9 @@ type parameter struct {
}
var pList = [3]parameter{
- {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 1, endTimestamp: 1000},
- {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 900, endTimestamp: 1000},
- {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 300, endTimestamp: 400},
+ {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 1, endTimestamp: 1000, scenario: "large-scale"},
+ {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 900, endTimestamp: 1000, scenario: "latest"},
+ {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality:
10, startTimestamp: 300, endTimestamp: 400, scenario: "historical"},
}
type mockIndex map[string]map[common.SeriesID]posting.List
@@ -287,47 +289,7 @@ func generateStream(db storage.TSDB[*tsTable, option])
*stream {
}
}
-func generateStreamFilterOptions(p parameter, index mockIndex)
pbv1.StreamFilterOptions {
- timeRange := timestamp.TimeRange{
- Start: time.Unix(int64(p.startTimestamp), 0),
- End: time.Unix(int64(p.endTimestamp), 0),
- IncludeStart: true,
- IncludeEnd: true,
- }
- entities := make([][]*modelv1.TagValue, 0)
- for i := 1; i <= p.seriesCount; i++ {
- entity := []*modelv1.TagValue{
- {
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value: entityTagValuePrefix +
strconv.Itoa(i),
- },
- },
- },
- }
- entities = append(entities, entity)
- }
- num := generateRandomNumber(int64(p.tagCardinality))
- value := filterTagValuePrefix + strconv.Itoa(num)
- filter := mockFilter{
- index: index,
- value: value,
- }
- tagProjection := pbv1.TagProjection{
- Family: "benchmark-family",
- Names: []string{"entity-tag", "filter-tag"},
- }
- return pbv1.StreamFilterOptions{
- Name: "benchmark",
- TimeRange: &timeRange,
- Entities: entities,
- Filter: filter,
- TagProjection: []pbv1.TagProjection{tagProjection},
- MaxElementSize: math.MaxInt32,
- }
-}
-
-func generateStreamSortOptions(p parameter, index mockIndex)
pbv1.StreamSortOptions {
+func generateStreamQueryOptions(p parameter, index mockIndex)
pbv1.StreamQueryOptions {
timeRange := timestamp.TimeRange{
Start: time.Unix(int64(p.startTimestamp), 0),
End: time.Unix(int64(p.endTimestamp), 0),
@@ -368,7 +330,7 @@ func generateStreamSortOptions(p parameter, index
mockIndex) pbv1.StreamSortOpti
Family: "benchmark-family",
Names: []string{"entity-tag", "filter-tag"},
}
- return pbv1.StreamSortOptions{
+ return pbv1.StreamQueryOptions{
Name: "benchmark",
TimeRange: &timeRange,
Entities: entities,
@@ -385,10 +347,11 @@ func BenchmarkFilter(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
- sfo := generateStreamFilterOptions(p, idx)
- b.Run("filter", func(b *testing.B) {
- _, err := s.Filter(context.TODO(), sfo)
+ sqo := generateStreamQueryOptions(p, idx)
+ b.Run("filter-"+p.scenario, func(b *testing.B) {
+ res, err := s.Filter(context.TODO(), sqo)
require.NoError(b, err)
+ logicalstream.BuildElementsFromStreamResult(res)
})
}
}
@@ -399,9 +362,9 @@ func BenchmarkSort(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
- sso := generateStreamSortOptions(p, idx)
- b.Run("sort", func(b *testing.B) {
- _, err := s.Sort(context.TODO(), sso)
+ sqo := generateStreamQueryOptions(p, idx)
+ b.Run("sort-"+p.scenario, func(b *testing.B) {
+ _, err := s.Sort(context.TODO(), sqo)
require.NoError(b, err)
})
}
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 63342b11..84b73b01 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -408,16 +408,17 @@ func releaseBlock(b *block) {
var blockPool sync.Pool
type blockCursor struct {
- p *part
- timestamps []int64
- elementIDs []string
- tagFamilies []tagFamily
- tagValuesDecoder encoding.BytesBlockDecoder
- tagProjection []pbv1.TagProjection
- bm blockMetadata
- idx int
- minTimestamp int64
- maxTimestamp int64
+ p *part
+ timestamps []int64
+ expectedTimestamps []int64
+ elementIDs []string
+ tagFamilies []tagFamily
+ tagValuesDecoder encoding.BytesBlockDecoder
+ tagProjection []pbv1.TagProjection
+ bm blockMetadata
+ idx int
+ minTimestamp int64
+ maxTimestamp int64
}
func (bc *blockCursor) reset() {
@@ -438,13 +439,17 @@ func (bc *blockCursor) reset() {
bc.tagFamilies = tff[:0]
}
-func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts
queryOptions) {
+func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) {
bc.reset()
bc.p = p
bc.bm.copyFrom(bm)
- bc.minTimestamp = queryOpts.minTimestamp
- bc.maxTimestamp = queryOpts.maxTimestamp
- bc.tagProjection = queryOpts.TagProjection
+ bc.minTimestamp = opts.minTimestamp
+ bc.maxTimestamp = opts.maxTimestamp
+ bc.tagProjection = opts.TagProjection
+ if opts.elementRefMap != nil {
+ seriesID := bc.bm.seriesID
+ bc.expectedTimestamps = opts.elementRefMap[seriesID]
+ }
}
func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
@@ -543,12 +548,30 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
bc.bm.tagFamilies = tf
tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)
- start, end, ok := timestamp.FindRange(tmpBlock.timestamps,
bc.minTimestamp, bc.maxTimestamp)
- if !ok {
- return false
+ idxList := make([]int, 0)
+ var start, end int
+ if bc.expectedTimestamps != nil {
+ for _, ts := range bc.expectedTimestamps {
+ idx := timestamp.Find(tmpBlock.timestamps, ts)
+ if idx == -1 {
+ continue
+ }
+ idxList = append(idxList, idx)
+ bc.timestamps = append(bc.timestamps,
tmpBlock.timestamps[idx])
+ bc.elementIDs = append(bc.elementIDs,
tmpBlock.elementIDs[idx])
+ }
+ if len(bc.timestamps) == 0 {
+ return false
+ }
+ } else {
+ s, e, ok := timestamp.FindRange(tmpBlock.timestamps,
bc.minTimestamp, bc.maxTimestamp)
+ start, end = s, e
+ if !ok {
+ return false
+ }
+ bc.timestamps = append(bc.timestamps,
tmpBlock.timestamps[s:e+1]...)
+ bc.elementIDs = append(bc.elementIDs,
tmpBlock.elementIDs[s:e+1]...)
}
- bc.timestamps = append(bc.timestamps,
tmpBlock.timestamps[start:end+1]...)
- bc.elementIDs = append(bc.elementIDs,
tmpBlock.elementIDs[start:end+1]...)
for i, projection := range bc.bm.tagProjection {
tf := tagFamily{
@@ -559,13 +582,19 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
t := tag{
name: name,
}
- if tmpBlock.tagFamilies[i].tags[blockIndex].name ==
name {
+ if len(tmpBlock.tagFamilies[i].tags) != 0 &&
tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
t.valueType =
tmpBlock.tagFamilies[i].tags[blockIndex].valueType
if
len(tmpBlock.tagFamilies[i].tags[blockIndex].values) !=
len(tmpBlock.timestamps) {
logger.Panicf("unexpected number of
values for tags %q: got %d; want %d",
tmpBlock.tagFamilies[i].tags[blockIndex].name,
len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
}
- t.values = append(t.values,
tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
+ if bc.expectedTimestamps != nil {
+ for _, idx := range idxList {
+ t.values = append(t.values,
tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
+ }
+ } else {
+ t.values = append(t.values,
tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
+ }
}
blockIndex++
tf.tags = append(tf.tags, t)
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index a6f31db1..7c02bf0d 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -87,6 +87,9 @@ func (e *elementIndex) Search(_ context.Context, seriesList
pbv1.SeriesList, fil
if pl == nil {
pl = roaring.DummyPostingList
}
+ if pl.IsEmpty() {
+ continue
+ }
timestamps := pl.ToSlice()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go
index 62fca5f0..f225acf9 100644
--- a/banyand/stream/iter_builder.go
+++ b/banyand/stream/iter_builder.go
@@ -31,20 +31,20 @@ import (
type filterFn func(itemID uint64) bool
func (s *stream) buildSeriesByIndex(tableWrappers
[]storage.TSTableWrapper[*tsTable],
- seriesList pbv1.SeriesList, sso pbv1.StreamSortOptions,
+ seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
) (series []*searcherIterator, err error) {
timeFilter := func(itemID uint64) bool {
- return sso.TimeRange.Contains(int64(itemID))
+ return sqo.TimeRange.Contains(int64(itemID))
}
- indexRuleForSorting := sso.Order.Index
+ indexRuleForSorting := sqo.Order.Index
if len(indexRuleForSorting.Tags) != 1 {
return nil, fmt.Errorf("only support one tag for sorting, but
got %d", len(indexRuleForSorting.Tags))
}
sortedTag := indexRuleForSorting.Tags[0]
tl := newTagLocation()
- for i := range sso.TagProjection {
- for j := range sso.TagProjection[i].Names {
- if sso.TagProjection[i].Names[j] == sortedTag {
+ for i := range sqo.TagProjection {
+ for j := range sqo.TagProjection[i].Names {
+ if sqo.TagProjection[i].Names[j] == sortedTag {
tl.familyIndex, tl.tagIndex = i, j
}
}
@@ -52,13 +52,13 @@ func (s *stream) buildSeriesByIndex(tableWrappers
[]storage.TSTableWrapper[*tsTa
if !tl.valid() {
return nil, fmt.Errorf("sorted tag %s not found in tag
projection", sortedTag)
}
- entityMap, tagSpecIndex, tagProjIndex, sidToIndex :=
s.genIndex(sso.TagProjection, seriesList)
+ entityMap, tagSpecIndex, tagProjIndex, sidToIndex :=
s.genIndex(sqo.TagProjection, seriesList)
sids := seriesList.IDs()
for _, tw := range tableWrappers {
seriesFilter := make(map[common.SeriesID]filterFn)
- if sso.Filter != nil {
+ if sqo.Filter != nil {
for i := range sids {
- pl, errExe := sso.Filter.Execute(func(_
databasev1.IndexRule_Type) (index.Searcher, error) {
+ pl, errExe := sqo.Filter.Execute(func(_
databasev1.IndexRule_Type) (index.Searcher, error) {
return tw.Table().Index().store, nil
}, sids[i])
if errExe != nil {
@@ -79,14 +79,14 @@ func (s *stream) buildSeriesByIndex(tableWrappers
[]storage.TSTableWrapper[*tsTa
IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
Analyzer: indexRuleForSorting.GetAnalyzer(),
}
- inner, err = tw.Table().Index().Sort(sids, fieldKey,
sso.Order.Sort, sso.MaxElementSize)
+ inner, err = tw.Table().Index().Sort(sids, fieldKey,
sqo.Order.Sort, sqo.MaxElementSize)
if err != nil {
return nil, err
}
if inner != nil {
series = append(series, newSearcherIterator(s.l, inner,
tw.Table(),
- seriesFilter, timeFilter, sso.TagProjection, tl,
+ seriesFilter, timeFilter, sqo.TagProjection, tl,
tagSpecIndex, tagProjIndex, sidToIndex,
seriesList, entityMap))
}
}
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 06a27443..ce29a725 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -38,6 +38,7 @@ import (
)
type queryOptions struct {
+ elementRefMap map[common.SeriesID][]int64
pbv1.StreamQueryOptions
minTimestamp int64
maxTimestamp int64
@@ -156,58 +157,77 @@ func (qr *queryResult) Pull() *pbv1.StreamResult {
if len(qr.data) == 0 {
return nil
}
- // TODO:// Parallel load
- tmpBlock := generateBlock()
- defer releaseBlock(tmpBlock)
+
+ cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
- if !qr.data[i].loadData(tmpBlock) {
- qr.data = append(qr.data[:i], qr.data[i+1:]...)
- i--
- }
- if qr.schema.GetEntity() == nil ||
len(qr.schema.GetEntity().GetTagNames()) == 0 {
- continue
- }
- sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID]
- series := qr.seriesList[sidIndex]
- entityMap := make(map[string]int)
- tagFamilyMap := make(map[string]int)
- for idx, entity := range
qr.schema.GetEntity().GetTagNames() {
- entityMap[entity] = idx + 1
- }
- for idx, tagFamily := range qr.data[i].tagFamilies {
- tagFamilyMap[tagFamily.name] = idx + 1
- }
- for _, tagFamilyProj := range qr.data[i].tagProjection {
- for j, tagProj := range tagFamilyProj.Names {
- offset := qr.tagNameIndex[tagProj]
- tagFamilySpec :=
qr.schema.GetTagFamilies()[offset.FamilyOffset]
- tagSpec :=
tagFamilySpec.GetTags()[offset.TagOffset]
- if tagSpec.IndexedOnly {
- continue
- }
- entityPos := entityMap[tagProj]
- tagFamilyPos :=
tagFamilyMap[tagFamilyProj.Family]
- if entityPos == 0 {
- continue
- }
- if tagFamilyPos == 0 {
-
qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{
- name:
tagFamilyProj.Family,
- tags: make([]tag, 0),
+ go func(i int) {
+ tmpBlock := generateBlock()
+ defer releaseBlock(tmpBlock)
+ if !qr.data[i].loadData(tmpBlock) {
+ cursorChan <- i
+ return
+ }
+ if qr.schema.GetEntity() == nil ||
len(qr.schema.GetEntity().GetTagNames()) == 0 {
+ cursorChan <- -1
+ return
+ }
+ sidIndex :=
qr.sidToIndex[qr.data[i].bm.seriesID]
+ series := qr.seriesList[sidIndex]
+ entityMap := make(map[string]int)
+ tagFamilyMap := make(map[string]int)
+ for idx, entity := range
qr.schema.GetEntity().GetTagNames() {
+ entityMap[entity] = idx + 1
+ }
+ for idx, tagFamily := range
qr.data[i].tagFamilies {
+ tagFamilyMap[tagFamily.name] = idx + 1
+ }
+ for _, tagFamilyProj := range
qr.data[i].tagProjection {
+ for j, tagProj := range
tagFamilyProj.Names {
+ offset :=
qr.tagNameIndex[tagProj]
+ tagFamilySpec :=
qr.schema.GetTagFamilies()[offset.FamilyOffset]
+ tagSpec :=
tagFamilySpec.GetTags()[offset.TagOffset]
+ if tagSpec.IndexedOnly {
+ continue
+ }
+ entityPos := entityMap[tagProj]
+ tagFamilyPos :=
tagFamilyMap[tagFamilyProj.Family]
+ if entityPos == 0 {
+ continue
+ }
+ if tagFamilyPos == 0 {
+
qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{
+ name:
tagFamilyProj.Family,
+ tags:
make([]tag, 0),
+ }
+ }
+ valueType :=
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
+
qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{
+ name: tagProj,
+ values:
mustEncodeTagValue(tagProj, tagSpec.GetType(),
series.EntityValues[entityPos-1], len(qr.data[i].timestamps)),
+ valueType: valueType,
}
- }
- valueType :=
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
-
qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{
- name: tagProj,
- values:
mustEncodeTagValue(tagProj, tagSpec.GetType(),
series.EntityValues[entityPos-1], len(qr.data[i].timestamps)),
- valueType: valueType,
}
}
- }
- if qr.orderByTimestampDesc() {
- qr.data[i].idx = len(qr.data[i].timestamps) - 1
+ if qr.orderByTimestampDesc() {
+ qr.data[i].idx =
len(qr.data[i].timestamps) - 1
+ }
+ cursorChan <- -1
+ }(i)
+ }
+
+ blankCursorList := []int{}
+ for completed := 0; completed < len(qr.data); completed++ {
+ result := <-cursorChan
+ if result != -1 {
+ blankCursorList = append(blankCursorList,
result)
}
}
+ sort.Slice(blankCursorList, func(i, j int) bool {
+ return blankCursorList[i] > blankCursorList[j]
+ })
+ for _, index := range blankCursorList {
+ qr.data = append(qr.data[:index], qr.data[index+1:]...)
+ }
qr.loaded = true
heap.Init(qr)
}
@@ -343,88 +363,118 @@ func (s *stream) genIndex(tagProj []pbv1.TagProjection,
seriesList pbv1.SeriesLi
return entityMap, tagSpecIndex, tagProjIndex, sidToIndex
}
-func (s *stream) Filter(ctx context.Context, sfo pbv1.StreamFilterOptions)
(sfr pbv1.StreamFilterResult, err error) {
- if sfo.TimeRange == nil || len(sfo.Entities) < 1 {
+func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error) {
+ if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
}
- if len(sfo.TagProjection) == 0 {
+ if len(sqo.TagProjection) == 0 {
return nil, errors.New("invalid query options: tagProjection is
required")
}
db := s.databaseSupplier.SupplyTSDB()
+ var result queryResult
if db == nil {
- return sfr, nil
+ return &result, nil
}
tsdb := db.(storage.TSDB[*tsTable, option])
- tabWrappers := tsdb.SelectTSTables(*sfo.TimeRange)
- sort.Slice(tabWrappers, func(i, j int) bool {
- return
tabWrappers[i].GetTimeRange().Start.Before(tabWrappers[j].GetTimeRange().Start)
- })
+ tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
defer func() {
for i := range tabWrappers {
tabWrappers[i].DecRef()
}
}()
-
- series := make([]*pbv1.Series, len(sfo.Entities))
- for i := range sfo.Entities {
+ series := make([]*pbv1.Series, len(sqo.Entities))
+ for i := range sqo.Entities {
series[i] = &pbv1.Series{
- Subject: sfo.Name,
- EntityValues: sfo.Entities[i],
+ Subject: sqo.Name,
+ EntityValues: sqo.Entities[i],
}
}
- seriesList, err := tsdb.Lookup(ctx, series)
+ sl, err := tsdb.Lookup(ctx, series)
if err != nil {
return nil, err
}
- if len(seriesList) == 0 {
- return sfr, nil
- }
- entityMap, tagSpecIndex, tagProjIndex, sidToIndex :=
s.genIndex(sfo.TagProjection, seriesList)
- ces := newColumnElements()
- for _, tw := range tabWrappers {
- if len(ces.timestamp) >= sfo.MaxElementSize {
- break
- }
- index := tw.Table().Index()
- erl, err := index.Search(ctx, seriesList, sfo.Filter,
sfo.TimeRange)
- if err != nil {
- return nil, err
+ if len(sl) < 1 {
+ return &result, nil
+ }
+ var sids []common.SeriesID
+ for i := range sl {
+ sids = append(sids, sl[i].ID)
+ }
+ var parts []*part
+ qo := queryOptions{
+ StreamQueryOptions: sqo,
+ minTimestamp: sqo.TimeRange.Start.UnixNano(),
+ maxTimestamp: sqo.TimeRange.End.UnixNano(),
+ }
+ var n int
+ for i := range tabWrappers {
+ s := tabWrappers[i].Table().currentSnapshot()
+ if s == nil {
+ continue
}
- if len(ces.timestamp)+len(erl) > sfo.MaxElementSize {
- erl = erl[:sfo.MaxElementSize-len(ces.timestamp)]
+ parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
+ if n < 1 {
+ s.decRef()
+ continue
}
- for _, er := range erl {
- e, count, err := tw.Table().getElement(er.seriesID,
er.timestamp, sfo.TagProjection)
- if err != nil {
- return nil, err
- }
- if len(tagProjIndex) != 0 {
- for entity, offset := range tagProjIndex {
- tagSpec := tagSpecIndex[entity]
- if tagSpec.IndexedOnly {
- continue
- }
- series :=
seriesList[sidToIndex[er.seriesID]]
- entityPos := entityMap[entity] - 1
-
e.tagFamilies[offset.FamilyOffset].tags[offset.TagOffset] = tag{
- name: entity,
- values:
mustEncodeTagValue(entity, tagSpec.GetType(), series.EntityValues[entityPos],
count),
- valueType:
pbv1.MustTagValueToValueType(series.EntityValues[entityPos]),
- }
- }
+ result.snapshots = append(result.snapshots, s)
+ }
+ bma := generateBlockMetadataArray()
+ defer releaseBlockMetadataArray(bma)
+ // TODO: cache tstIter
+ var ti tstIter
+ defer ti.reset()
+ originalSids := make([]common.SeriesID, len(sids))
+ copy(originalSids, sids)
+ sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
+ ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+ if ti.Error() != nil {
+ return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error())
+ }
+ for ti.nextBlock() {
+ bc := generateBlockCursor()
+ p := ti.piHeap[0]
+ bc.init(p.p, p.curBlock, qo)
+ result.data = append(result.data, bc)
+ }
+ if ti.Error() != nil {
+ return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
+ }
+
+ entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl)
+ result.entityMap = entityMap
+ result.sidToIndex = sidToIndex
+ result.tagNameIndex = make(map[string]partition.TagLocator)
+ result.schema = s.schema
+ result.seriesList = sl
+ for i, si := range originalSids {
+ result.sidToIndex[si] = i
+ }
+ for i, tagFamilySpec := range s.schema.GetTagFamilies() {
+ for j, tagSpec := range tagFamilySpec.GetTags() {
+ result.tagNameIndex[tagSpec.GetName()] =
partition.TagLocator{
+ FamilyOffset: i,
+ TagOffset: j,
}
- ces.BuildFromElement(e, sfo.TagProjection)
}
}
- return ces, nil
+ result.orderByTS = true
+ if sqo.Order == nil {
+ result.ascTS = true
+ return &result, nil
+ }
+ if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
+ result.ascTS = true
+ }
+ return &result, nil
}
-func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr
pbv1.StreamSortResult, err error) {
- if sso.TimeRange == nil || len(sso.Entities) < 1 {
+func (s *stream) Sort(ctx context.Context, sqo pbv1.StreamQueryOptions) (ssr
pbv1.StreamSortResult, err error) {
+ if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
}
- if len(sso.TagProjection) == 0 {
+ if len(sqo.TagProjection) == 0 {
return nil, errors.New("invalid query options: tagProjection is
required")
}
db := s.databaseSupplier.SupplyTSDB()
@@ -432,17 +482,18 @@ func (s *stream) Sort(ctx context.Context, sso
pbv1.StreamSortOptions) (ssr pbv1
return ssr, nil
}
tsdb := db.(storage.TSDB[*tsTable, option])
- tabWrappers := tsdb.SelectTSTables(*sso.TimeRange)
+ tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
defer func() {
for i := range tabWrappers {
tabWrappers[i].DecRef()
}
}()
- series := make([]*pbv1.Series, len(sso.Entities))
- for i := range sso.Entities {
+
+ series := make([]*pbv1.Series, len(sqo.Entities))
+ for i := range sqo.Entities {
series[i] = &pbv1.Series{
- Subject: sso.Name,
- EntityValues: sso.Entities[i],
+ Subject: sqo.Name,
+ EntityValues: sqo.Entities[i],
}
}
seriesList, err := tsdb.Lookup(ctx, series)
@@ -453,16 +504,15 @@ func (s *stream) Sort(ctx context.Context, sso
pbv1.StreamSortOptions) (ssr pbv1
return ssr, nil
}
- iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sso)
+ iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sqo)
if err != nil {
return nil, err
}
-
if len(iters) == 0 {
return ssr, nil
}
- it := newItemIter(iters, sso.Order.Sort)
+ it := newItemIter(iters, sqo.Order.Sort)
defer func() {
err = multierr.Append(err, it.Close())
}()
@@ -471,15 +521,27 @@ func (s *stream) Sort(ctx context.Context, sso
pbv1.StreamSortOptions) (ssr pbv1
for it.Next() {
nextItem := it.Val()
e := nextItem.element
- ces.BuildFromElement(e, sso.TagProjection)
- if len(ces.timestamp) >= sso.MaxElementSize {
+ ces.BuildFromElement(e, sqo.TagProjection)
+ if len(ces.timestamp) >= sqo.MaxElementSize {
break
}
}
return ces, err
}
-func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error) {
+// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by
input sorting order.
+func newItemIter(iters []*searcherIterator, s modelv1.Sort)
itersort.Iterator[item] {
+ var ii []itersort.Iterator[item]
+ for _, iter := range iters {
+ ii = append(ii, iter)
+ }
+ if s == modelv1.Sort_SORT_DESC {
+ return itersort.NewItemIter[item](ii, true)
+ }
+ return itersort.NewItemIter[item](ii, false)
+}
+
+func (s *stream) Filter(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr
pbv1.StreamQueryResult, err error) {
if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
}
@@ -489,7 +551,7 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (pbv1.S
db := s.databaseSupplier.SupplyTSDB()
var result queryResult
if db == nil {
- return &result, nil
+ return sqr, nil
}
tsdb := db.(storage.TSDB[*tsTable, option])
tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
@@ -498,6 +560,7 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (pbv1.S
tabWrappers[i].DecRef()
}
}()
+
series := make([]*pbv1.Series, len(sqo.Entities))
for i := range sqo.Entities {
series[i] = &pbv1.Series{
@@ -505,24 +568,46 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (pbv1.S
EntityValues: sqo.Entities[i],
}
}
- sl, err := tsdb.Lookup(ctx, series)
+ seriesList, err := tsdb.Lookup(ctx, series)
if err != nil {
return nil, err
}
+ if len(seriesList) == 0 {
+ return sqr, nil
+ }
- if len(sl) < 1 {
- return &result, nil
+ var elementRefList []elementRef
+ for _, tw := range tabWrappers {
+ index := tw.Table().Index()
+ erl, err := index.Search(ctx, seriesList, sqo.Filter,
sqo.TimeRange)
+ if err != nil {
+ return nil, err
+ }
+ elementRefList = append(elementRefList, erl...)
+ if len(elementRefList) > sqo.MaxElementSize {
+ elementRefList = elementRefList[:sqo.MaxElementSize]
+ break
+ }
}
- var sids []common.SeriesID
- for i := range sl {
- sids = append(sids, sl[i].ID)
+ var elementRefMap map[common.SeriesID][]int64
+ if len(elementRefList) != 0 {
+ elementRefMap = make(map[common.SeriesID][]int64)
+ for _, ref := range elementRefList {
+ if _, ok := elementRefMap[ref.seriesID]; !ok {
+ elementRefMap[ref.seriesID] =
[]int64{ref.timestamp}
+ } else {
+ elementRefMap[ref.seriesID] =
append(elementRefMap[ref.seriesID], ref.timestamp)
+ }
+ }
}
- var parts []*part
qo := queryOptions{
StreamQueryOptions: sqo,
minTimestamp: sqo.TimeRange.Start.UnixNano(),
maxTimestamp: sqo.TimeRange.End.UnixNano(),
+ elementRefMap: elementRefMap,
}
+
+ var parts []*part
var n int
for i := range tabWrappers {
s := tabWrappers[i].Table().currentSnapshot()
@@ -539,31 +624,41 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (pbv1.S
bma := generateBlockMetadataArray()
defer releaseBlockMetadataArray(bma)
// TODO: cache tstIter
- var tstIter tstIter
- defer tstIter.reset()
+ var ti tstIter
+ defer ti.reset()
+ var sids []common.SeriesID
+ for i := 0; i < len(seriesList); i++ {
+ sid := seriesList[i].ID
+ if _, ok := elementRefMap[sid]; !ok {
+ seriesList = append(seriesList[:i], seriesList[i+1:]...)
+ i--
+ continue
+ }
+ sids = append(sids, sid)
+ }
originalSids := make([]common.SeriesID, len(sids))
copy(originalSids, sids)
sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
- tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
- if tstIter.Error() != nil {
- return nil, fmt.Errorf("cannot init tstIter: %w",
tstIter.Error())
+ ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+ if ti.Error() != nil {
+ return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error())
}
- for tstIter.nextBlock() {
+ for ti.nextBlock() {
bc := generateBlockCursor()
- p := tstIter.piHeap[0]
+ p := ti.piHeap[0]
bc.init(p.p, p.curBlock, qo)
result.data = append(result.data, bc)
}
- if tstIter.Error() != nil {
- return nil, fmt.Errorf("cannot iterate tstIter: %w",
tstIter.Error())
+ if ti.Error() != nil {
+ return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
}
- entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl)
+ entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, seriesList)
result.entityMap = entityMap
result.sidToIndex = sidToIndex
result.tagNameIndex = make(map[string]partition.TagLocator)
result.schema = s.schema
- result.seriesList = sl
+ result.seriesList = seriesList
for i, si := range originalSids {
result.sidToIndex[si] = i
}
@@ -575,30 +670,13 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (pbv1.S
}
}
}
+ result.orderByTS = true
if sqo.Order == nil {
- result.orderByTS = true
result.ascTS = true
return &result, nil
}
- if sqo.Order.Index == nil {
- result.orderByTS = true
- if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
- result.ascTS = true
- }
- return &result, nil
+ if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
+ result.ascTS = true
}
-
return &result, nil
}
-
-// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by
input sorting order.
-func newItemIter(iters []*searcherIterator, s modelv1.Sort)
itersort.Iterator[item] {
- var ii []itersort.Iterator[item]
- for _, iter := range iters {
- ii = append(ii, iter)
- }
- if s == modelv1.Sort_SORT_DESC {
- return itersort.NewItemIter[item](ii, true)
- }
- return itersort.NewItemIter[item](ii, false)
-}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 0ba53a9d..8c6f1b7a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -61,8 +61,8 @@ type Stream interface {
GetSchema() *databasev1.Stream
GetIndexRules() []*databasev1.IndexRule
Query(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
- Sort(ctx context.Context, opts pbv1.StreamSortOptions)
(pbv1.StreamSortResult, error)
- Filter(ctx context.Context, opts pbv1.StreamFilterOptions)
(pbv1.StreamFilterResult, error)
+ Sort(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamSortResult, error)
+ Filter(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
}
var _ Stream = (*stream)(nil)
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index dce4d832..140a0c32 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -132,27 +132,6 @@ type TagProjection struct {
// StreamQueryOptions is the options of a stream query.
type StreamQueryOptions struct {
- Name string
- TimeRange *timestamp.TimeRange
- Entities [][]*modelv1.TagValue
- Filter index.Filter
- Order *OrderBy
- TagProjection []TagProjection
-}
-
-// StreamSortOptions is the options of a stream sort.
-type StreamSortOptions struct {
- Name string
- TimeRange *timestamp.TimeRange
- Entities [][]*modelv1.TagValue
- Filter index.Filter
- Order *OrderBy
- TagProjection []TagProjection
- MaxElementSize int
-}
-
-// StreamFilterOptions is the options of a stream filter.
-type StreamFilterOptions struct {
Name string
TimeRange *timestamp.TimeRange
Entities [][]*modelv1.TagValue
@@ -173,11 +152,6 @@ type StreamSortResult interface {
Pull() *StreamColumnResult
}
-// StreamFilterResult is the result of a stream filter.
-type StreamFilterResult interface {
- Pull() *StreamColumnResult
-}
-
// MeasureQueryOptions is the options of a measure query.
type MeasureQueryOptions struct {
Name string
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 40b73dde..a5edf44c 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -31,8 +31,8 @@ import (
// StreamExecutionContext allows retrieving data through the stream module.
type StreamExecutionContext interface {
Query(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
- Sort(ctx context.Context, opts pbv1.StreamSortOptions)
(pbv1.StreamSortResult, error)
- Filter(ctx context.Context, opts pbv1.StreamFilterOptions)
(pbv1.StreamFilterResult, error)
+ Sort(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamSortResult, error)
+ Filter(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
}
// StreamExecutionContextKey is the key of stream execution context in
context.Context.
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 99ce11f4..2e7a39fb 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -73,7 +73,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
([]*streamv1.Element, erro
ec := executor.FromStreamExecutionContext(ctx)
if i.order != nil && i.order.Index != nil {
- ssr, err := ec.Sort(ctx, pbv1.StreamSortOptions{
+ ssr, err := ec.Sort(ctx, pbv1.StreamQueryOptions{
Name: i.metadata.GetName(),
TimeRange: &i.timeRange,
Entities: i.entities,
@@ -93,7 +93,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
([]*streamv1.Element, erro
}
if i.filter != nil && i.filter != logical.ENode {
- sfr, err := ec.Filter(ctx, pbv1.StreamFilterOptions{
+ result, err := ec.Filter(ctx, pbv1.StreamQueryOptions{
Name: i.metadata.GetName(),
TimeRange: &i.timeRange,
Entities: i.entities,
@@ -105,11 +105,10 @@ func (i *localIndexScan) Execute(ctx context.Context)
([]*streamv1.Element, erro
if err != nil {
return nil, err
}
- if sfr == nil {
+ if result == nil {
return nil, nil
}
- r := sfr.Pull()
- return buildElementsFromColumnResult(r), nil
+ return BuildElementsFromStreamResult(result), nil
}
result, err := ec.Query(ctx, pbv1.StreamQueryOptions{
@@ -123,7 +122,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
([]*streamv1.Element, erro
if err != nil {
return nil, fmt.Errorf("failed to query stream: %w", err)
}
- return buildElementsFromQueryResults(result), nil
+ return BuildElementsFromStreamResult(result), nil
}
func (i *localIndexScan) String() string {
@@ -167,7 +166,8 @@ func buildElementsFromColumnResult(r
*pbv1.StreamColumnResult) (elements []*stre
return
}
-func buildElementsFromQueryResults(result pbv1.StreamQueryResult) (elements
[]*streamv1.Element) {
+// BuildElementsFromStreamResult builds a slice of elements from the given
stream query result.
+func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements
[]*streamv1.Element) {
deduplication := make(map[string]struct{})
for {
r := result.Pull()
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index 77603ed0..aaf719bb 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -148,3 +148,26 @@ func FindRange[T int64 | uint64](timestamps []T, min, max
T) (int, int, bool) {
}
return start, end, start <= end
}
+
+// Find returns the index of the target in the sorted 'timestamps' slice.
+func Find(timestamps []int64, target int64) int {
+ if len(timestamps) == 0 {
+ return -1
+ }
+ if timestamps[0] > target || timestamps[len(timestamps)-1] < target {
+ return -1
+ }
+ left, right := 0, len(timestamps)-1
+ for left <= right {
+ mid := (left + right) / 2
+ if timestamps[mid] == target {
+ return mid
+ }
+ if timestamps[mid] < target {
+ left = mid + 1
+ } else {
+ right = mid - 1
+ }
+ }
+ return -1
+}
diff --git a/test/cases/stream/data/input/order_asc.yaml
b/test/cases/stream/data/input/order_asc.yaml
new file mode 100644
index 00000000..9963ac2d
--- /dev/null
+++ b/test/cases/stream/data/input/order_asc.yaml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+ tagFamilies:
+ - name: "searchable"
+ tags: ["trace_id", "duration"]
+ - name: "data"
+ tags: ["data_binary"]
+orderBy:
+ sort: "SORT_ASC"
diff --git a/test/cases/stream/data/input/order_desc.yaml
b/test/cases/stream/data/input/order_desc.yaml
new file mode 100644
index 00000000..3c2d97d3
--- /dev/null
+++ b/test/cases/stream/data/input/order_desc.yaml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+ tagFamilies:
+ - name: "searchable"
+ tags: ["trace_id", "duration"]
+ - name: "data"
+ tags: ["data_binary"]
+orderBy:
+ sort: "SORT_DESC"
diff --git a/test/cases/stream/data/want/order_asc.yaml
b/test/cases/stream/data/want/order_asc.yaml
new file mode 100644
index 00000000..725cf084
--- /dev/null
+++ b/test/cases/stream/data/want/order_asc.yaml
@@ -0,0 +1,103 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+ - elementId: "0"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "1"
+ - key: duration
+ value:
+ int:
+ value: "1000"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "1"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "2"
+ - key: duration
+ value:
+ int:
+ value: "500"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "2"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "3"
+ - key: duration
+ value:
+ int:
+ value: "30"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "3"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "4"
+ - key: duration
+ value:
+ int:
+ value: "60"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "4"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "5"
+ - key: duration
+ value:
+ int:
+ value: "300"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
diff --git a/test/cases/stream/data/want/order_desc.yaml
b/test/cases/stream/data/want/order_desc.yaml
new file mode 100644
index 00000000..055f9449
--- /dev/null
+++ b/test/cases/stream/data/want/order_desc.yaml
@@ -0,0 +1,103 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+ - elementId: "4"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "5"
+ - key: duration
+ value:
+ int:
+ value: "300"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "3"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "4"
+ - key: duration
+ value:
+ int:
+ value: "60"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "2"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "3"
+ - key: duration
+ value:
+ int:
+ value: "30"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "1"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "2"
+ - key: duration
+ value:
+ int:
+ value: "500"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+ - elementId: "0"
+ tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: "1"
+ - key: duration
+ value:
+ int:
+ value: "1000"
+ - name: data
+ tags:
+ - key: data_binary
+ value:
+ binaryData: YWJjMTIzIT8kKiYoKSctPUB+
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 66408bdd..200a9979 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -49,6 +49,8 @@ var _ = g.DescribeTable("Scanning Streams", func(args
helpers.Args) {
g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 *
time.Hour}),
g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 *
time.Hour}),
+ g.Entry("order asc", helpers.Args{Input: "order_asc", Duration: 1 *
time.Hour}),
+ g.Entry("order desc", helpers.Args{Input: "order_desc", Duration: 1 *
time.Hour}),
g.Entry("nothing", helpers.Args{
Input: "all",
Begin: timestamppb.New(time.Unix(0,
0).Truncate(time.Millisecond)),