This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 715674c4 Improve filtering performance of Stream (#440)
715674c4 is described below

commit 715674c40d00fbd42748eda3ebcfe7f04a2d290f
Author: Huang Youliang <[email protected]>
AuthorDate: Fri May 24 12:53:48 2024 +0800

    Improve filtering performance of Stream (#440)
    
    * Improve filtering performance of Stream
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 banyand/cmd/server/main.go                         |   2 +
 banyand/measure/query.go                           |  39 ++-
 banyand/stream/benchmark_test.go                   |  65 +---
 banyand/stream/block.go                            |  71 ++--
 banyand/stream/index.go                            |   3 +
 banyand/stream/iter_builder.go                     |  22 +-
 banyand/stream/query.go                            | 376 +++++++++++++--------
 banyand/stream/stream.go                           |   4 +-
 pkg/pb/v1/metadata.go                              |  26 --
 pkg/query/executor/interface.go                    |   4 +-
 .../logical/stream/stream_plan_indexscan_local.go  |  14 +-
 pkg/timestamp/range.go                             |  23 ++
 test/cases/stream/data/input/order_asc.yaml        |  27 ++
 test/cases/stream/data/input/order_desc.yaml       |  27 ++
 test/cases/stream/data/want/order_asc.yaml         | 103 ++++++
 test/cases/stream/data/want/order_desc.yaml        | 103 ++++++
 test/cases/stream/stream.go                        |   2 +
 17 files changed, 630 insertions(+), 281 deletions(-)

diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go
index 99827d13..3d7b6d77 100644
--- a/banyand/cmd/server/main.go
+++ b/banyand/cmd/server/main.go
@@ -21,12 +21,14 @@ package main
 import (
        "fmt"
        "os"
+       "runtime"
 
        "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
        "github.com/apache/skywalking-banyandb/pkg/signal"
 )
 
 func main() {
+       runtime.GOMAXPROCS(runtime.NumCPU() * 2)
        if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
                _, _ = fmt.Fprintln(os.Stderr, err)
                os.Exit(1)
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 57bd381d..876bcb5a 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -373,21 +373,36 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
                if len(qr.data) == 0 {
                        return nil
                }
-               // TODO:// Parallel load
-               tmpBlock := generateBlock()
-               defer releaseBlock(tmpBlock)
+
+               cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
-                       if !qr.data[i].loadData(tmpBlock) {
-                               qr.data = append(qr.data[:i], qr.data[i+1:]...)
-                               i--
-                       }
-                       if i < 0 {
-                               continue
-                       }
-                       if qr.orderByTimestampDesc() {
-                               qr.data[i].idx = len(qr.data[i].timestamps) - 1
+                       go func(i int) {
+                               tmpBlock := generateBlock()
+                               defer releaseBlock(tmpBlock)
+                               if !qr.data[i].loadData(tmpBlock) {
+                                       cursorChan <- i
+                                       return
+                               }
+                               if qr.orderByTimestampDesc() {
+                                       qr.data[i].idx = 
len(qr.data[i].timestamps) - 1
+                               }
+                               cursorChan <- -1
+                       }(i)
+               }
+
+               blankCursorList := []int{}
+               for completed := 0; completed < len(qr.data); completed++ {
+                       result := <-cursorChan
+                       if result != -1 {
+                               blankCursorList = append(blankCursorList, 
result)
                        }
                }
+               sort.Slice(blankCursorList, func(i, j int) bool {
+                       return blankCursorList[i] > blankCursorList[j]
+               })
+               for _, index := range blankCursorList {
+                       qr.data = append(qr.data[:index], qr.data[index+1:]...)
+               }
                qr.loaded = true
                heap.Init(qr)
        }
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index c9e8d868..4b0b4cbc 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -42,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -54,6 +55,7 @@ const (
 )
 
 type parameter struct {
+       scenario       string
        batchCount     int
        timestampCount int
        seriesCount    int
@@ -63,9 +65,9 @@ type parameter struct {
 }
 
 var pList = [3]parameter{
-       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 1, endTimestamp: 1000},
-       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 900, endTimestamp: 1000},
-       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 300, endTimestamp: 400},
+       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 1, endTimestamp: 1000, scenario: "large-scale"},
+       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 900, endTimestamp: 1000, scenario: "latest"},
+       {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 
10, startTimestamp: 300, endTimestamp: 400, scenario: "historical"},
 }
 
 type mockIndex map[string]map[common.SeriesID]posting.List
@@ -287,47 +289,7 @@ func generateStream(db storage.TSDB[*tsTable, option]) 
*stream {
        }
 }
 
-func generateStreamFilterOptions(p parameter, index mockIndex) 
pbv1.StreamFilterOptions {
-       timeRange := timestamp.TimeRange{
-               Start:        time.Unix(int64(p.startTimestamp), 0),
-               End:          time.Unix(int64(p.endTimestamp), 0),
-               IncludeStart: true,
-               IncludeEnd:   true,
-       }
-       entities := make([][]*modelv1.TagValue, 0)
-       for i := 1; i <= p.seriesCount; i++ {
-               entity := []*modelv1.TagValue{
-                       {
-                               Value: &modelv1.TagValue_Str{
-                                       Str: &modelv1.Str{
-                                               Value: entityTagValuePrefix + 
strconv.Itoa(i),
-                                       },
-                               },
-                       },
-               }
-               entities = append(entities, entity)
-       }
-       num := generateRandomNumber(int64(p.tagCardinality))
-       value := filterTagValuePrefix + strconv.Itoa(num)
-       filter := mockFilter{
-               index: index,
-               value: value,
-       }
-       tagProjection := pbv1.TagProjection{
-               Family: "benchmark-family",
-               Names:  []string{"entity-tag", "filter-tag"},
-       }
-       return pbv1.StreamFilterOptions{
-               Name:           "benchmark",
-               TimeRange:      &timeRange,
-               Entities:       entities,
-               Filter:         filter,
-               TagProjection:  []pbv1.TagProjection{tagProjection},
-               MaxElementSize: math.MaxInt32,
-       }
-}
-
-func generateStreamSortOptions(p parameter, index mockIndex) 
pbv1.StreamSortOptions {
+func generateStreamQueryOptions(p parameter, index mockIndex) 
pbv1.StreamQueryOptions {
        timeRange := timestamp.TimeRange{
                Start:        time.Unix(int64(p.startTimestamp), 0),
                End:          time.Unix(int64(p.endTimestamp), 0),
@@ -368,7 +330,7 @@ func generateStreamSortOptions(p parameter, index 
mockIndex) pbv1.StreamSortOpti
                Family: "benchmark-family",
                Names:  []string{"entity-tag", "filter-tag"},
        }
-       return pbv1.StreamSortOptions{
+       return pbv1.StreamQueryOptions{
                Name:           "benchmark",
                TimeRange:      &timeRange,
                Entities:       entities,
@@ -385,10 +347,11 @@ func BenchmarkFilter(b *testing.B) {
                esList, docsList, idx := generateData(p)
                db := write(b, p, esList, docsList)
                s := generateStream(db)
-               sfo := generateStreamFilterOptions(p, idx)
-               b.Run("filter", func(b *testing.B) {
-                       _, err := s.Filter(context.TODO(), sfo)
+               sqo := generateStreamQueryOptions(p, idx)
+               b.Run("filter-"+p.scenario, func(b *testing.B) {
+                       res, err := s.Filter(context.TODO(), sqo)
                        require.NoError(b, err)
+                       logicalstream.BuildElementsFromStreamResult(res)
                })
        }
 }
@@ -399,9 +362,9 @@ func BenchmarkSort(b *testing.B) {
                esList, docsList, idx := generateData(p)
                db := write(b, p, esList, docsList)
                s := generateStream(db)
-               sso := generateStreamSortOptions(p, idx)
-               b.Run("sort", func(b *testing.B) {
-                       _, err := s.Sort(context.TODO(), sso)
+               sqo := generateStreamQueryOptions(p, idx)
+               b.Run("sort-"+p.scenario, func(b *testing.B) {
+                       _, err := s.Sort(context.TODO(), sqo)
                        require.NoError(b, err)
                })
        }
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 63342b11..84b73b01 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -408,16 +408,17 @@ func releaseBlock(b *block) {
 var blockPool sync.Pool
 
 type blockCursor struct {
-       p                *part
-       timestamps       []int64
-       elementIDs       []string
-       tagFamilies      []tagFamily
-       tagValuesDecoder encoding.BytesBlockDecoder
-       tagProjection    []pbv1.TagProjection
-       bm               blockMetadata
-       idx              int
-       minTimestamp     int64
-       maxTimestamp     int64
+       p                  *part
+       timestamps         []int64
+       expectedTimestamps []int64
+       elementIDs         []string
+       tagFamilies        []tagFamily
+       tagValuesDecoder   encoding.BytesBlockDecoder
+       tagProjection      []pbv1.TagProjection
+       bm                 blockMetadata
+       idx                int
+       minTimestamp       int64
+       maxTimestamp       int64
 }
 
 func (bc *blockCursor) reset() {
@@ -438,13 +439,17 @@ func (bc *blockCursor) reset() {
        bc.tagFamilies = tff[:0]
 }
 
-func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts 
queryOptions) {
+func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) {
        bc.reset()
        bc.p = p
        bc.bm.copyFrom(bm)
-       bc.minTimestamp = queryOpts.minTimestamp
-       bc.maxTimestamp = queryOpts.maxTimestamp
-       bc.tagProjection = queryOpts.TagProjection
+       bc.minTimestamp = opts.minTimestamp
+       bc.maxTimestamp = opts.maxTimestamp
+       bc.tagProjection = opts.TagProjection
+       if opts.elementRefMap != nil {
+               seriesID := bc.bm.seriesID
+               bc.expectedTimestamps = opts.elementRefMap[seriesID]
+       }
 }
 
 func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
@@ -543,12 +548,30 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        bc.bm.tagFamilies = tf
        tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)
 
-       start, end, ok := timestamp.FindRange(tmpBlock.timestamps, 
bc.minTimestamp, bc.maxTimestamp)
-       if !ok {
-               return false
+       idxList := make([]int, 0)
+       var start, end int
+       if bc.expectedTimestamps != nil {
+               for _, ts := range bc.expectedTimestamps {
+                       idx := timestamp.Find(tmpBlock.timestamps, ts)
+                       if idx == -1 {
+                               continue
+                       }
+                       idxList = append(idxList, idx)
+                       bc.timestamps = append(bc.timestamps, 
tmpBlock.timestamps[idx])
+                       bc.elementIDs = append(bc.elementIDs, 
tmpBlock.elementIDs[idx])
+               }
+               if len(bc.timestamps) == 0 {
+                       return false
+               }
+       } else {
+               s, e, ok := timestamp.FindRange(tmpBlock.timestamps, 
bc.minTimestamp, bc.maxTimestamp)
+               start, end = s, e
+               if !ok {
+                       return false
+               }
+               bc.timestamps = append(bc.timestamps, 
tmpBlock.timestamps[s:e+1]...)
+               bc.elementIDs = append(bc.elementIDs, 
tmpBlock.elementIDs[s:e+1]...)
        }
-       bc.timestamps = append(bc.timestamps, 
tmpBlock.timestamps[start:end+1]...)
-       bc.elementIDs = append(bc.elementIDs, 
tmpBlock.elementIDs[start:end+1]...)
 
        for i, projection := range bc.bm.tagProjection {
                tf := tagFamily{
@@ -559,13 +582,19 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
                        t := tag{
                                name: name,
                        }
-                       if tmpBlock.tagFamilies[i].tags[blockIndex].name == 
name {
+                       if len(tmpBlock.tagFamilies[i].tags) != 0 && 
tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
                                t.valueType = 
tmpBlock.tagFamilies[i].tags[blockIndex].valueType
                                if 
len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != 
len(tmpBlock.timestamps) {
                                        logger.Panicf("unexpected number of 
values for tags %q: got %d; want %d",
                                                
tmpBlock.tagFamilies[i].tags[blockIndex].name, 
len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
                                }
-                               t.values = append(t.values, 
tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
+                               if bc.expectedTimestamps != nil {
+                                       for _, idx := range idxList {
+                                               t.values = append(t.values, 
tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
+                                       }
+                               } else {
+                                       t.values = append(t.values, 
tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
+                               }
                        }
                        blockIndex++
                        tf.tags = append(tf.tags, t)
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index a6f31db1..7c02bf0d 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -87,6 +87,9 @@ func (e *elementIndex) Search(_ context.Context, seriesList 
pbv1.SeriesList, fil
                if pl == nil {
                        pl = roaring.DummyPostingList
                }
+               if pl.IsEmpty() {
+                       continue
+               }
                timestamps := pl.ToSlice()
                sort.Slice(timestamps, func(i, j int) bool {
                        return timestamps[i] < timestamps[j]
diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go
index 62fca5f0..f225acf9 100644
--- a/banyand/stream/iter_builder.go
+++ b/banyand/stream/iter_builder.go
@@ -31,20 +31,20 @@ import (
 type filterFn func(itemID uint64) bool
 
 func (s *stream) buildSeriesByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTable],
-       seriesList pbv1.SeriesList, sso pbv1.StreamSortOptions,
+       seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
 ) (series []*searcherIterator, err error) {
        timeFilter := func(itemID uint64) bool {
-               return sso.TimeRange.Contains(int64(itemID))
+               return sqo.TimeRange.Contains(int64(itemID))
        }
-       indexRuleForSorting := sso.Order.Index
+       indexRuleForSorting := sqo.Order.Index
        if len(indexRuleForSorting.Tags) != 1 {
                return nil, fmt.Errorf("only support one tag for sorting, but 
got %d", len(indexRuleForSorting.Tags))
        }
        sortedTag := indexRuleForSorting.Tags[0]
        tl := newTagLocation()
-       for i := range sso.TagProjection {
-               for j := range sso.TagProjection[i].Names {
-                       if sso.TagProjection[i].Names[j] == sortedTag {
+       for i := range sqo.TagProjection {
+               for j := range sqo.TagProjection[i].Names {
+                       if sqo.TagProjection[i].Names[j] == sortedTag {
                                tl.familyIndex, tl.tagIndex = i, j
                        }
                }
@@ -52,13 +52,13 @@ func (s *stream) buildSeriesByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTa
        if !tl.valid() {
                return nil, fmt.Errorf("sorted tag %s not found in tag 
projection", sortedTag)
        }
-       entityMap, tagSpecIndex, tagProjIndex, sidToIndex := 
s.genIndex(sso.TagProjection, seriesList)
+       entityMap, tagSpecIndex, tagProjIndex, sidToIndex := 
s.genIndex(sqo.TagProjection, seriesList)
        sids := seriesList.IDs()
        for _, tw := range tableWrappers {
                seriesFilter := make(map[common.SeriesID]filterFn)
-               if sso.Filter != nil {
+               if sqo.Filter != nil {
                        for i := range sids {
-                               pl, errExe := sso.Filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
+                               pl, errExe := sqo.Filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
                                        return tw.Table().Index().store, nil
                                }, sids[i])
                                if errExe != nil {
@@ -79,14 +79,14 @@ func (s *stream) buildSeriesByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTa
                        IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
                        Analyzer:    indexRuleForSorting.GetAnalyzer(),
                }
-               inner, err = tw.Table().Index().Sort(sids, fieldKey, 
sso.Order.Sort, sso.MaxElementSize)
+               inner, err = tw.Table().Index().Sort(sids, fieldKey, 
sqo.Order.Sort, sqo.MaxElementSize)
                if err != nil {
                        return nil, err
                }
 
                if inner != nil {
                        series = append(series, newSearcherIterator(s.l, inner, 
tw.Table(),
-                               seriesFilter, timeFilter, sso.TagProjection, tl,
+                               seriesFilter, timeFilter, sqo.TagProjection, tl,
                                tagSpecIndex, tagProjIndex, sidToIndex, 
seriesList, entityMap))
                }
        }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 06a27443..ce29a725 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -38,6 +38,7 @@ import (
 )
 
 type queryOptions struct {
+       elementRefMap map[common.SeriesID][]int64
        pbv1.StreamQueryOptions
        minTimestamp int64
        maxTimestamp int64
@@ -156,58 +157,77 @@ func (qr *queryResult) Pull() *pbv1.StreamResult {
                if len(qr.data) == 0 {
                        return nil
                }
-               // TODO:// Parallel load
-               tmpBlock := generateBlock()
-               defer releaseBlock(tmpBlock)
+
+               cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
-                       if !qr.data[i].loadData(tmpBlock) {
-                               qr.data = append(qr.data[:i], qr.data[i+1:]...)
-                               i--
-                       }
-                       if qr.schema.GetEntity() == nil || 
len(qr.schema.GetEntity().GetTagNames()) == 0 {
-                               continue
-                       }
-                       sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID]
-                       series := qr.seriesList[sidIndex]
-                       entityMap := make(map[string]int)
-                       tagFamilyMap := make(map[string]int)
-                       for idx, entity := range 
qr.schema.GetEntity().GetTagNames() {
-                               entityMap[entity] = idx + 1
-                       }
-                       for idx, tagFamily := range qr.data[i].tagFamilies {
-                               tagFamilyMap[tagFamily.name] = idx + 1
-                       }
-                       for _, tagFamilyProj := range qr.data[i].tagProjection {
-                               for j, tagProj := range tagFamilyProj.Names {
-                                       offset := qr.tagNameIndex[tagProj]
-                                       tagFamilySpec := 
qr.schema.GetTagFamilies()[offset.FamilyOffset]
-                                       tagSpec := 
tagFamilySpec.GetTags()[offset.TagOffset]
-                                       if tagSpec.IndexedOnly {
-                                               continue
-                                       }
-                                       entityPos := entityMap[tagProj]
-                                       tagFamilyPos := 
tagFamilyMap[tagFamilyProj.Family]
-                                       if entityPos == 0 {
-                                               continue
-                                       }
-                                       if tagFamilyPos == 0 {
-                                               
qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{
-                                                       name: 
tagFamilyProj.Family,
-                                                       tags: make([]tag, 0),
+                       go func(i int) {
+                               tmpBlock := generateBlock()
+                               defer releaseBlock(tmpBlock)
+                               if !qr.data[i].loadData(tmpBlock) {
+                                       cursorChan <- i
+                                       return
+                               }
+                               if qr.schema.GetEntity() == nil || 
len(qr.schema.GetEntity().GetTagNames()) == 0 {
+                                       cursorChan <- -1
+                                       return
+                               }
+                               sidIndex := 
qr.sidToIndex[qr.data[i].bm.seriesID]
+                               series := qr.seriesList[sidIndex]
+                               entityMap := make(map[string]int)
+                               tagFamilyMap := make(map[string]int)
+                               for idx, entity := range 
qr.schema.GetEntity().GetTagNames() {
+                                       entityMap[entity] = idx + 1
+                               }
+                               for idx, tagFamily := range 
qr.data[i].tagFamilies {
+                                       tagFamilyMap[tagFamily.name] = idx + 1
+                               }
+                               for _, tagFamilyProj := range 
qr.data[i].tagProjection {
+                                       for j, tagProj := range 
tagFamilyProj.Names {
+                                               offset := 
qr.tagNameIndex[tagProj]
+                                               tagFamilySpec := 
qr.schema.GetTagFamilies()[offset.FamilyOffset]
+                                               tagSpec := 
tagFamilySpec.GetTags()[offset.TagOffset]
+                                               if tagSpec.IndexedOnly {
+                                                       continue
+                                               }
+                                               entityPos := entityMap[tagProj]
+                                               tagFamilyPos := 
tagFamilyMap[tagFamilyProj.Family]
+                                               if entityPos == 0 {
+                                                       continue
+                                               }
+                                               if tagFamilyPos == 0 {
+                                                       
qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{
+                                                               name: 
tagFamilyProj.Family,
+                                                               tags: 
make([]tag, 0),
+                                                       }
+                                               }
+                                               valueType := 
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
+                                               
qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{
+                                                       name:      tagProj,
+                                                       values:    
mustEncodeTagValue(tagProj, tagSpec.GetType(), 
series.EntityValues[entityPos-1], len(qr.data[i].timestamps)),
+                                                       valueType: valueType,
                                                }
-                                       }
-                                       valueType := 
pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1])
-                                       
qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{
-                                               name:      tagProj,
-                                               values:    
mustEncodeTagValue(tagProj, tagSpec.GetType(), 
series.EntityValues[entityPos-1], len(qr.data[i].timestamps)),
-                                               valueType: valueType,
                                        }
                                }
-                       }
-                       if qr.orderByTimestampDesc() {
-                               qr.data[i].idx = len(qr.data[i].timestamps) - 1
+                               if qr.orderByTimestampDesc() {
+                                       qr.data[i].idx = 
len(qr.data[i].timestamps) - 1
+                               }
+                               cursorChan <- -1
+                       }(i)
+               }
+
+               blankCursorList := []int{}
+               for completed := 0; completed < len(qr.data); completed++ {
+                       result := <-cursorChan
+                       if result != -1 {
+                               blankCursorList = append(blankCursorList, 
result)
                        }
                }
+               sort.Slice(blankCursorList, func(i, j int) bool {
+                       return blankCursorList[i] > blankCursorList[j]
+               })
+               for _, index := range blankCursorList {
+                       qr.data = append(qr.data[:index], qr.data[index+1:]...)
+               }
                qr.loaded = true
                heap.Init(qr)
        }
@@ -343,88 +363,118 @@ func (s *stream) genIndex(tagProj []pbv1.TagProjection, 
seriesList pbv1.SeriesLi
        return entityMap, tagSpecIndex, tagProjIndex, sidToIndex
 }
 
-func (s *stream) Filter(ctx context.Context, sfo pbv1.StreamFilterOptions) 
(sfr pbv1.StreamFilterResult, err error) {
-       if sfo.TimeRange == nil || len(sfo.Entities) < 1 {
+func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error) {
+       if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
        }
-       if len(sfo.TagProjection) == 0 {
+       if len(sqo.TagProjection) == 0 {
                return nil, errors.New("invalid query options: tagProjection is 
required")
        }
        db := s.databaseSupplier.SupplyTSDB()
+       var result queryResult
        if db == nil {
-               return sfr, nil
+               return &result, nil
        }
        tsdb := db.(storage.TSDB[*tsTable, option])
-       tabWrappers := tsdb.SelectTSTables(*sfo.TimeRange)
-       sort.Slice(tabWrappers, func(i, j int) bool {
-               return 
tabWrappers[i].GetTimeRange().Start.Before(tabWrappers[j].GetTimeRange().Start)
-       })
+       tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
        defer func() {
                for i := range tabWrappers {
                        tabWrappers[i].DecRef()
                }
        }()
-
-       series := make([]*pbv1.Series, len(sfo.Entities))
-       for i := range sfo.Entities {
+       series := make([]*pbv1.Series, len(sqo.Entities))
+       for i := range sqo.Entities {
                series[i] = &pbv1.Series{
-                       Subject:      sfo.Name,
-                       EntityValues: sfo.Entities[i],
+                       Subject:      sqo.Name,
+                       EntityValues: sqo.Entities[i],
                }
        }
-       seriesList, err := tsdb.Lookup(ctx, series)
+       sl, err := tsdb.Lookup(ctx, series)
        if err != nil {
                return nil, err
        }
-       if len(seriesList) == 0 {
-               return sfr, nil
-       }
 
-       entityMap, tagSpecIndex, tagProjIndex, sidToIndex := 
s.genIndex(sfo.TagProjection, seriesList)
-       ces := newColumnElements()
-       for _, tw := range tabWrappers {
-               if len(ces.timestamp) >= sfo.MaxElementSize {
-                       break
-               }
-               index := tw.Table().Index()
-               erl, err := index.Search(ctx, seriesList, sfo.Filter, 
sfo.TimeRange)
-               if err != nil {
-                       return nil, err
+       if len(sl) < 1 {
+               return &result, nil
+       }
+       var sids []common.SeriesID
+       for i := range sl {
+               sids = append(sids, sl[i].ID)
+       }
+       var parts []*part
+       qo := queryOptions{
+               StreamQueryOptions: sqo,
+               minTimestamp:       sqo.TimeRange.Start.UnixNano(),
+               maxTimestamp:       sqo.TimeRange.End.UnixNano(),
+       }
+       var n int
+       for i := range tabWrappers {
+               s := tabWrappers[i].Table().currentSnapshot()
+               if s == nil {
+                       continue
                }
-               if len(ces.timestamp)+len(erl) > sfo.MaxElementSize {
-                       erl = erl[:sfo.MaxElementSize-len(ces.timestamp)]
+               parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
+               if n < 1 {
+                       s.decRef()
+                       continue
                }
-               for _, er := range erl {
-                       e, count, err := tw.Table().getElement(er.seriesID, 
er.timestamp, sfo.TagProjection)
-                       if err != nil {
-                               return nil, err
-                       }
-                       if len(tagProjIndex) != 0 {
-                               for entity, offset := range tagProjIndex {
-                                       tagSpec := tagSpecIndex[entity]
-                                       if tagSpec.IndexedOnly {
-                                               continue
-                                       }
-                                       series := 
seriesList[sidToIndex[er.seriesID]]
-                                       entityPos := entityMap[entity] - 1
-                                       
e.tagFamilies[offset.FamilyOffset].tags[offset.TagOffset] = tag{
-                                               name:      entity,
-                                               values:    
mustEncodeTagValue(entity, tagSpec.GetType(), series.EntityValues[entityPos], 
count),
-                                               valueType: 
pbv1.MustTagValueToValueType(series.EntityValues[entityPos]),
-                                       }
-                               }
+               result.snapshots = append(result.snapshots, s)
+       }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
+       // TODO: cache tstIter
+       var ti tstIter
+       defer ti.reset()
+       originalSids := make([]common.SeriesID, len(sids))
+       copy(originalSids, sids)
+       sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
+       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       if ti.Error() != nil {
+               return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error())
+       }
+       for ti.nextBlock() {
+               bc := generateBlockCursor()
+               p := ti.piHeap[0]
+               bc.init(p.p, p.curBlock, qo)
+               result.data = append(result.data, bc)
+       }
+       if ti.Error() != nil {
+               return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
+       }
+
+       entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl)
+       result.entityMap = entityMap
+       result.sidToIndex = sidToIndex
+       result.tagNameIndex = make(map[string]partition.TagLocator)
+       result.schema = s.schema
+       result.seriesList = sl
+       for i, si := range originalSids {
+               result.sidToIndex[si] = i
+       }
+       for i, tagFamilySpec := range s.schema.GetTagFamilies() {
+               for j, tagSpec := range tagFamilySpec.GetTags() {
+                       result.tagNameIndex[tagSpec.GetName()] = 
partition.TagLocator{
+                               FamilyOffset: i,
+                               TagOffset:    j,
                        }
-                       ces.BuildFromElement(e, sfo.TagProjection)
                }
        }
-       return ces, nil
+       result.orderByTS = true
+       if sqo.Order == nil {
+               result.ascTS = true
+               return &result, nil
+       }
+       if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+               result.ascTS = true
+       }
+       return &result, nil
 }
 
-func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr 
pbv1.StreamSortResult, err error) {
-       if sso.TimeRange == nil || len(sso.Entities) < 1 {
+func (s *stream) Sort(ctx context.Context, sqo pbv1.StreamQueryOptions) (ssr 
pbv1.StreamSortResult, err error) {
+       if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
        }
-       if len(sso.TagProjection) == 0 {
+       if len(sqo.TagProjection) == 0 {
                return nil, errors.New("invalid query options: tagProjection is 
required")
        }
        db := s.databaseSupplier.SupplyTSDB()
@@ -432,17 +482,18 @@ func (s *stream) Sort(ctx context.Context, sso 
pbv1.StreamSortOptions) (ssr pbv1
                return ssr, nil
        }
        tsdb := db.(storage.TSDB[*tsTable, option])
-       tabWrappers := tsdb.SelectTSTables(*sso.TimeRange)
+       tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
        defer func() {
                for i := range tabWrappers {
                        tabWrappers[i].DecRef()
                }
        }()
-       series := make([]*pbv1.Series, len(sso.Entities))
-       for i := range sso.Entities {
+
+       series := make([]*pbv1.Series, len(sqo.Entities))
+       for i := range sqo.Entities {
                series[i] = &pbv1.Series{
-                       Subject:      sso.Name,
-                       EntityValues: sso.Entities[i],
+                       Subject:      sqo.Name,
+                       EntityValues: sqo.Entities[i],
                }
        }
        seriesList, err := tsdb.Lookup(ctx, series)
@@ -453,16 +504,15 @@ func (s *stream) Sort(ctx context.Context, sso 
pbv1.StreamSortOptions) (ssr pbv1
                return ssr, nil
        }
 
-       iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sso)
+       iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sqo)
        if err != nil {
                return nil, err
        }
-
        if len(iters) == 0 {
                return ssr, nil
        }
 
-       it := newItemIter(iters, sso.Order.Sort)
+       it := newItemIter(iters, sqo.Order.Sort)
        defer func() {
                err = multierr.Append(err, it.Close())
        }()
@@ -471,15 +521,27 @@ func (s *stream) Sort(ctx context.Context, sso 
pbv1.StreamSortOptions) (ssr pbv1
        for it.Next() {
                nextItem := it.Val()
                e := nextItem.element
-               ces.BuildFromElement(e, sso.TagProjection)
-               if len(ces.timestamp) >= sso.MaxElementSize {
+               ces.BuildFromElement(e, sqo.TagProjection)
+               if len(ces.timestamp) >= sqo.MaxElementSize {
                        break
                }
        }
        return ces, err
 }
 
-func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error) {
+// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by 
input sorting order.
+func newItemIter(iters []*searcherIterator, s modelv1.Sort) 
itersort.Iterator[item] {
+       var ii []itersort.Iterator[item]
+       for _, iter := range iters {
+               ii = append(ii, iter)
+       }
+       if s == modelv1.Sort_SORT_DESC {
+               return itersort.NewItemIter[item](ii, true)
+       }
+       return itersort.NewItemIter[item](ii, false)
+}
+
+func (s *stream) Filter(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr 
pbv1.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
        }
@@ -489,7 +551,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
        db := s.databaseSupplier.SupplyTSDB()
        var result queryResult
        if db == nil {
-               return &result, nil
+               return sqr, nil
        }
        tsdb := db.(storage.TSDB[*tsTable, option])
        tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
@@ -498,6 +560,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        tabWrappers[i].DecRef()
                }
        }()
+
        series := make([]*pbv1.Series, len(sqo.Entities))
        for i := range sqo.Entities {
                series[i] = &pbv1.Series{
@@ -505,24 +568,46 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        EntityValues: sqo.Entities[i],
                }
        }
-       sl, err := tsdb.Lookup(ctx, series)
+       seriesList, err := tsdb.Lookup(ctx, series)
        if err != nil {
                return nil, err
        }
+       if len(seriesList) == 0 {
+               return sqr, nil
+       }
 
-       if len(sl) < 1 {
-               return &result, nil
+       var elementRefList []elementRef
+       for _, tw := range tabWrappers {
+               index := tw.Table().Index()
+               erl, err := index.Search(ctx, seriesList, sqo.Filter, 
sqo.TimeRange)
+               if err != nil {
+                       return nil, err
+               }
+               elementRefList = append(elementRefList, erl...)
+               if len(elementRefList) > sqo.MaxElementSize {
+                       elementRefList = elementRefList[:sqo.MaxElementSize]
+                       break
+               }
        }
-       var sids []common.SeriesID
-       for i := range sl {
-               sids = append(sids, sl[i].ID)
+       var elementRefMap map[common.SeriesID][]int64
+       if len(elementRefList) != 0 {
+               elementRefMap = make(map[common.SeriesID][]int64)
+               for _, ref := range elementRefList {
+                       if _, ok := elementRefMap[ref.seriesID]; !ok {
+                               elementRefMap[ref.seriesID] = 
[]int64{ref.timestamp}
+                       } else {
+                               elementRefMap[ref.seriesID] = 
append(elementRefMap[ref.seriesID], ref.timestamp)
+                       }
+               }
        }
-       var parts []*part
        qo := queryOptions{
                StreamQueryOptions: sqo,
                minTimestamp:       sqo.TimeRange.Start.UnixNano(),
                maxTimestamp:       sqo.TimeRange.End.UnixNano(),
+               elementRefMap:      elementRefMap,
        }
+
+       var parts []*part
        var n int
        for i := range tabWrappers {
                s := tabWrappers[i].Table().currentSnapshot()
@@ -539,31 +624,41 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
        bma := generateBlockMetadataArray()
        defer releaseBlockMetadataArray(bma)
        // TODO: cache tstIter
-       var tstIter tstIter
-       defer tstIter.reset()
+       var ti tstIter
+       defer ti.reset()
+       var sids []common.SeriesID
+       for i := 0; i < len(seriesList); i++ {
+               sid := seriesList[i].ID
+               if _, ok := elementRefMap[sid]; !ok {
+                       seriesList = append(seriesList[:i], seriesList[i+1:]...)
+                       i--
+                       continue
+               }
+               sids = append(sids, sid)
+       }
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
-       if tstIter.Error() != nil {
-               return nil, fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
+       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       if ti.Error() != nil {
+               return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error())
        }
-       for tstIter.nextBlock() {
+       for ti.nextBlock() {
                bc := generateBlockCursor()
-               p := tstIter.piHeap[0]
+               p := ti.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
                result.data = append(result.data, bc)
        }
-       if tstIter.Error() != nil {
-               return nil, fmt.Errorf("cannot iterate tstIter: %w", 
tstIter.Error())
+       if ti.Error() != nil {
+               return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
        }
 
-       entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl)
+       entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, seriesList)
        result.entityMap = entityMap
        result.sidToIndex = sidToIndex
        result.tagNameIndex = make(map[string]partition.TagLocator)
        result.schema = s.schema
-       result.seriesList = sl
+       result.seriesList = seriesList
        for i, si := range originalSids {
                result.sidToIndex[si] = i
        }
@@ -575,30 +670,13 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        }
                }
        }
+       result.orderByTS = true
        if sqo.Order == nil {
-               result.orderByTS = true
                result.ascTS = true
                return &result, nil
        }
-       if sqo.Order.Index == nil {
-               result.orderByTS = true
-               if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-                       result.ascTS = true
-               }
-               return &result, nil
+       if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+               result.ascTS = true
        }
-
        return &result, nil
 }
-
-// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by 
input sorting order.
-func newItemIter(iters []*searcherIterator, s modelv1.Sort) 
itersort.Iterator[item] {
-       var ii []itersort.Iterator[item]
-       for _, iter := range iters {
-               ii = append(ii, iter)
-       }
-       if s == modelv1.Sort_SORT_DESC {
-               return itersort.NewItemIter[item](ii, true)
-       }
-       return itersort.NewItemIter[item](ii, false)
-}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 0ba53a9d..8c6f1b7a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -61,8 +61,8 @@ type Stream interface {
        GetSchema() *databasev1.Stream
        GetIndexRules() []*databasev1.IndexRule
        Query(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
-       Sort(ctx context.Context, opts pbv1.StreamSortOptions) 
(pbv1.StreamSortResult, error)
-       Filter(ctx context.Context, opts pbv1.StreamFilterOptions) 
(pbv1.StreamFilterResult, error)
+       Sort(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamSortResult, error)
+       Filter(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
 }
 
 var _ Stream = (*stream)(nil)
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index dce4d832..140a0c32 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -132,27 +132,6 @@ type TagProjection struct {
 
 // StreamQueryOptions is the options of a stream query.
 type StreamQueryOptions struct {
-       Name          string
-       TimeRange     *timestamp.TimeRange
-       Entities      [][]*modelv1.TagValue
-       Filter        index.Filter
-       Order         *OrderBy
-       TagProjection []TagProjection
-}
-
-// StreamSortOptions is the options of a stream sort.
-type StreamSortOptions struct {
-       Name           string
-       TimeRange      *timestamp.TimeRange
-       Entities       [][]*modelv1.TagValue
-       Filter         index.Filter
-       Order          *OrderBy
-       TagProjection  []TagProjection
-       MaxElementSize int
-}
-
-// StreamFilterOptions is the options of a stream filter.
-type StreamFilterOptions struct {
        Name           string
        TimeRange      *timestamp.TimeRange
        Entities       [][]*modelv1.TagValue
@@ -173,11 +152,6 @@ type StreamSortResult interface {
        Pull() *StreamColumnResult
 }
 
-// StreamFilterResult is the result of a stream filter.
-type StreamFilterResult interface {
-       Pull() *StreamColumnResult
-}
-
 // MeasureQueryOptions is the options of a measure query.
 type MeasureQueryOptions struct {
        Name            string
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 40b73dde..a5edf44c 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -31,8 +31,8 @@ import (
 // StreamExecutionContext allows retrieving data through the stream module.
 type StreamExecutionContext interface {
        Query(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
-       Sort(ctx context.Context, opts pbv1.StreamSortOptions) 
(pbv1.StreamSortResult, error)
-       Filter(ctx context.Context, opts pbv1.StreamFilterOptions) 
(pbv1.StreamFilterResult, error)
+       Sort(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamSortResult, error)
+       Filter(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
 }
 
 // StreamExecutionContextKey is the key of stream execution context in 
context.Context.
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 99ce11f4..2e7a39fb 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -73,7 +73,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        ec := executor.FromStreamExecutionContext(ctx)
 
        if i.order != nil && i.order.Index != nil {
-               ssr, err := ec.Sort(ctx, pbv1.StreamSortOptions{
+               ssr, err := ec.Sort(ctx, pbv1.StreamQueryOptions{
                        Name:           i.metadata.GetName(),
                        TimeRange:      &i.timeRange,
                        Entities:       i.entities,
@@ -93,7 +93,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        }
 
        if i.filter != nil && i.filter != logical.ENode {
-               sfr, err := ec.Filter(ctx, pbv1.StreamFilterOptions{
+               result, err := ec.Filter(ctx, pbv1.StreamQueryOptions{
                        Name:           i.metadata.GetName(),
                        TimeRange:      &i.timeRange,
                        Entities:       i.entities,
@@ -105,11 +105,10 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
                if err != nil {
                        return nil, err
                }
-               if sfr == nil {
+               if result == nil {
                        return nil, nil
                }
-               r := sfr.Pull()
-               return buildElementsFromColumnResult(r), nil
+               return BuildElementsFromStreamResult(result), nil
        }
 
        result, err := ec.Query(ctx, pbv1.StreamQueryOptions{
@@ -123,7 +122,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        if err != nil {
                return nil, fmt.Errorf("failed to query stream: %w", err)
        }
-       return buildElementsFromQueryResults(result), nil
+       return BuildElementsFromStreamResult(result), nil
 }
 
 func (i *localIndexScan) String() string {
@@ -167,7 +166,8 @@ func buildElementsFromColumnResult(r 
*pbv1.StreamColumnResult) (elements []*stre
        return
 }
 
-func buildElementsFromQueryResults(result pbv1.StreamQueryResult) (elements 
[]*streamv1.Element) {
+// BuildElementsFromStreamResult builds a slice of elements from the given 
stream query result.
+func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements 
[]*streamv1.Element) {
        deduplication := make(map[string]struct{})
        for {
                r := result.Pull()
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index 77603ed0..aaf719bb 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -148,3 +148,26 @@ func FindRange[T int64 | uint64](timestamps []T, min, max 
T) (int, int, bool) {
        }
        return start, end, start <= end
 }
+
+// Find returns the index of the target in the sorted 'timestamps' slice.
+func Find(timestamps []int64, target int64) int {
+       if len(timestamps) == 0 {
+               return -1
+       }
+       if timestamps[0] > target || timestamps[len(timestamps)-1] < target {
+               return -1
+       }
+       left, right := 0, len(timestamps)-1
+       for left <= right {
+               mid := (left + right) / 2
+               if timestamps[mid] == target {
+                       return mid
+               }
+               if timestamps[mid] < target {
+                       left = mid + 1
+               } else {
+                       right = mid - 1
+               }
+       }
+       return -1
+}
diff --git a/test/cases/stream/data/input/order_asc.yaml 
b/test/cases/stream/data/input/order_asc.yaml
new file mode 100644
index 00000000..9963ac2d
--- /dev/null
+++ b/test/cases/stream/data/input/order_asc.yaml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+  tagFamilies:
+  - name: "searchable"
+    tags: ["trace_id", "duration"]
+  - name: "data"
+    tags: ["data_binary"]
+orderBy:
+  sort: "SORT_ASC"
diff --git a/test/cases/stream/data/input/order_desc.yaml 
b/test/cases/stream/data/input/order_desc.yaml
new file mode 100644
index 00000000..3c2d97d3
--- /dev/null
+++ b/test/cases/stream/data/input/order_desc.yaml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+  tagFamilies:
+  - name: "searchable"
+    tags: ["trace_id", "duration"]
+  - name: "data"
+    tags: ["data_binary"]
+orderBy:
+  sort: "SORT_DESC"
diff --git a/test/cases/stream/data/want/order_asc.yaml 
b/test/cases/stream/data/want/order_asc.yaml
new file mode 100644
index 00000000..725cf084
--- /dev/null
+++ b/test/cases/stream/data/want/order_asc.yaml
@@ -0,0 +1,103 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+  - elementId: "0"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "1"
+      - key: duration
+        value:
+          int:
+            value: "1000"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "1"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "2"
+      - key: duration
+        value:
+          int:
+            value: "500"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "2"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "3"
+      - key: duration
+        value:
+          int:
+            value: "30"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "3"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "4"
+      - key: duration
+        value:
+          int:
+            value: "60"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "4"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "5"
+      - key: duration
+        value:
+          int:
+            value: "300"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
diff --git a/test/cases/stream/data/want/order_desc.yaml 
b/test/cases/stream/data/want/order_desc.yaml
new file mode 100644
index 00000000..055f9449
--- /dev/null
+++ b/test/cases/stream/data/want/order_desc.yaml
@@ -0,0 +1,103 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+  - elementId: "4"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "5"
+      - key: duration
+        value:
+          int:
+            value: "300"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "3"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "4"
+      - key: duration
+        value:
+          int:
+            value: "60"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "2"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "3"
+      - key: duration
+        value:
+          int:
+            value: "30"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "1"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "2"
+      - key: duration
+        value:
+          int:
+            value: "500"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
+  - elementId: "0"
+    tagFamilies:
+    - name: searchable
+      tags:
+      - key: trace_id
+        value:
+          str:
+            value: "1"
+      - key: duration
+        value:
+          int:
+            value: "1000"
+    - name: data
+      tags:
+      - key: data_binary
+        value:
+          binaryData: YWJjMTIzIT8kKiYoKSctPUB+
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 66408bdd..200a9979 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -49,6 +49,8 @@ var _ = g.DescribeTable("Scanning Streams", func(args 
helpers.Args) {
        g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
        g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
        g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * 
time.Hour}),
+       g.Entry("order asc", helpers.Args{Input: "order_asc", Duration: 1 * 
time.Hour}),
+       g.Entry("order desc", helpers.Args{Input: "order_desc", Duration: 1 * 
time.Hour}),
        g.Entry("nothing", helpers.Args{
                Input:     "all",
                Begin:     timestamppb.New(time.Unix(0, 
0).Truncate(time.Millisecond)),


Reply via email to