This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit f6057ef76ddcc3d330bc94aeb890f0d04b5bf357
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jan 17 08:13:25 2025 +0800

    Introduce batch scan to stream query by timestamp
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/measure/topn.go         |   2 +-
 banyand/stream/block_scanner.go | 212 +++++++++
 banyand/stream/index.go         |   7 +-
 banyand/stream/merger_policy.go |   8 -
 banyand/stream/query.go         | 500 +++------------------
 banyand/stream/query_by_idx.go  | 321 ++++++++++++++
 banyand/stream/query_by_ts.go   | 265 +++++++++++
 banyand/stream/query_test.go    | 424 ------------------
 banyand/stream/stream.go        |  14 +-
 banyand/stream/trace.go         |   2 +-
 banyand/stream/tstable_test.go  |   7 -
 pkg/pb/v1/value.go              |   2 +-
 pkg/query/logical/optimizer.go  |   2 +-
 pkg/query/model/model.go        | 158 +++++++
 pkg/query/model/model_test.go   | 947 ++++++++++++++++++++++++++++++++++++++++
 15 files changed, 1976 insertions(+), 895 deletions(-)

diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index c2f7c40f..4170aef6 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -676,7 +676,7 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
        for i, ev := range t.entityValues {
                t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf, 
t.entities[i], ev)
                if err != nil {
-                       return fmt.Errorf("cannot unmarshal 
topNValue.entityValues[%d]: %w", i, err)
+                       return fmt.Errorf("cannot unmarshal 
topNValue.entityValues[%d]:%s %w", i, ev, err)
                }
                if len(t.entities[i]) != len(t.entityTagNames) {
                        return fmt.Errorf("entityValues[%d] length is not equal 
to entityTagNames", i)
diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go
new file mode 100644
index 00000000..dff5ecce
--- /dev/null
+++ b/banyand/stream/block_scanner.go
@@ -0,0 +1,212 @@
+package stream
+
+import (
+       "context"
+       "fmt"
+       "sort"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
+)
+
+const blockScannerBatchSize = 32
+
+type blockScanResult struct {
+       p  *part
+       qo queryOptions
+       bm blockMetadata
+}
+
+func (bs *blockScanResult) reset() {
+       bs.p = nil
+       bs.bm.reset()
+}
+
+type blockScanResultBatch struct {
+       err error
+       bss []blockScanResult
+}
+
+func (bsb *blockScanResultBatch) reset() {
+       for i := range bsb.bss {
+               bsb.bss[i].reset()
+       }
+       bsb.bss = bsb.bss[:0]
+}
+
+func generateBlockScanResultBatch() *blockScanResultBatch {
+       v := blockScanResultBatchPool.Get()
+       if v == nil {
+               return &blockScanResultBatch{
+                       bss: make([]blockScanResult, 0, blockScannerBatchSize),
+               }
+       }
+       return v
+}
+
+func releaseBlockScanResultBatch(bsb *blockScanResultBatch) {
+       bsb.reset()
+       blockScanResultBatchPool.Put(bsb)
+}
+
+var blockScanResultBatchPool = 
pool.Register[*blockScanResultBatch]("stream-blockScannerBatch")
+
+var shardScanConcurrencyCh = make(chan struct{}, cgroups.CPUs())
+
+type blockScanner struct {
+       segment   storage.Segment[*tsTable, *option]
+       series    []*pbv1.Series
+       seriesIDs []uint64
+       qo        queryOptions
+}
+
+func (q *blockScanner) searchSeries(ctx context.Context) error {
+       seriesFilter := roaring.NewPostingList()
+       sl, err := q.segment.Lookup(ctx, q.series)
+       if err != nil {
+               return err
+       }
+       for i := range sl {
+               if seriesFilter.Contains(uint64(sl[i].ID)) {
+                       continue
+               }
+               seriesFilter.Insert(uint64(sl[i].ID))
+               if q.qo.seriesToEntity == nil {
+                       q.qo.seriesToEntity = 
make(map[common.SeriesID][]*modelv1.TagValue)
+               }
+               q.qo.seriesToEntity[sl[i].ID] = sl[i].EntityValues
+               q.qo.sortedSids = append(q.qo.sortedSids, sl[i].ID)
+       }
+       if seriesFilter.IsEmpty() {
+               return nil
+       }
+       q.seriesIDs = seriesFilter.ToSlice()
+       sort.Slice(q.qo.sortedSids, func(i, j int) bool { return 
q.qo.sortedSids[i] < q.qo.sortedSids[j] })
+       return nil
+}
+
+func (q *blockScanner) scanShardsInParallel(ctx context.Context, wg 
*sync.WaitGroup, blockCh chan *blockScanResultBatch) []scanFinalizer {
+       tabs := q.segment.Tables()
+       finalizers := make([]scanFinalizer, len(tabs))
+       for i := range tabs {
+               select {
+               case shardScanConcurrencyCh <- struct{}{}:
+               case <-ctx.Done():
+                       return finalizers
+               }
+               wg.Add(1)
+               go func(idx int, tab *tsTable) {
+                       finalizers[idx] = q.scanBlocks(ctx, q.seriesIDs, tab, 
blockCh)
+                       wg.Done()
+                       <-shardScanConcurrencyCh
+               }(i, tabs[i])
+       }
+       return finalizers
+}
+
+func (q *blockScanner) scanBlocks(ctx context.Context, seriesList []uint64, 
tab *tsTable, blockCh chan *blockScanResultBatch) (sf scanFinalizer) {
+       s := tab.currentSnapshot()
+       if s == nil {
+               return nil
+       }
+       sf = s.decRef
+       filter, err := q.indexSearch(ctx, seriesList, tab)
+       if err != nil {
+               select {
+               case blockCh <- &blockScanResultBatch{err: err}:
+               case <-ctx.Done():
+               }
+               return
+       }
+       select {
+       case <-ctx.Done():
+               return
+       default:
+       }
+
+       parts, n := s.getParts(nil, q.qo.minTimestamp, q.qo.maxTimestamp)
+       if n < 1 {
+               return
+       }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
+       ti := generateTstIter()
+       defer releaseTstIter(ti)
+       ti.init(bma, parts, q.qo.sortedSids, q.qo.minTimestamp, 
q.qo.maxTimestamp)
+       batch := generateBlockScanResultBatch()
+       if ti.Error() != nil {
+               batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error())
+               select {
+               case blockCh <- batch:
+               case <-ctx.Done():
+                       releaseBlockScanResultBatch(batch)
+               }
+               return
+       }
+       for ti.nextBlock() {
+               p := ti.piHeap[0]
+               batch.bss = append(batch.bss, blockScanResult{
+                       p: p.p,
+               })
+               bs := &batch.bss[len(batch.bss)-1]
+               bs.qo.copyFrom(&q.qo)
+               bs.qo.elementFilter = filter
+               bs.bm.copyFrom(p.curBlock)
+               if len(batch.bss) >= cap(batch.bss) {
+                       select {
+                       case blockCh <- batch:
+                       case <-ctx.Done():
+                               releaseBlockScanResultBatch(batch)
+                               return
+                       }
+                       batch = generateBlockScanResultBatch()
+               }
+       }
+       if ti.Error() != nil {
+               batch.err = fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
+               select {
+               case blockCh <- batch:
+               case <-ctx.Done():
+                       releaseBlockScanResultBatch(batch)
+               }
+               return
+       }
+       if len(batch.bss) > 0 {
+               select {
+               case blockCh <- batch:
+               case <-ctx.Done():
+                       releaseBlockScanResultBatch(batch)
+               }
+               return
+       }
+       releaseBlockScanResultBatch(batch)
+       return
+}
+
+func (q *blockScanner) indexSearch(ctx context.Context, seriesList []uint64, 
tw *tsTable) (posting.List, error) {
+       if q.qo.Filter == nil || q.qo.Filter == logicalstream.ENode {
+               return nil, nil
+       }
+       pl, err := tw.Index().Search(ctx, seriesList, q.qo.Filter)
+       if err != nil {
+               return nil, err
+       }
+       if pl == nil {
+               return roaring.DummyPostingList, nil
+       }
+       return pl, nil
+}
+
+func (q *blockScanner) close() {
+       q.segment.DecRef()
+}
+
+type scanFinalizer func()
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 128308cd..a21f4b62 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -30,7 +30,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -71,9 +70,9 @@ func (e *elementIndex) Write(docs index.Documents) error {
        })
 }
 
-func (e *elementIndex) Search(ctx context.Context, seriesList pbv1.SeriesList, 
filter index.Filter) (posting.List, error) {
+func (e *elementIndex) Search(ctx context.Context, seriesList []uint64, filter 
index.Filter) (posting.List, error) {
        var result posting.List
-       for i, series := range seriesList {
+       for i, id := range seriesList {
                select {
                case <-ctx.Done():
                        return nil, errors.WithMessagef(ctx.Err(), "search 
series %d/%d", i, len(seriesList))
@@ -81,7 +80,7 @@ func (e *elementIndex) Search(ctx context.Context, seriesList 
pbv1.SeriesList, f
                }
                pl, err := filter.Execute(func(_ databasev1.IndexRule_Type) 
(index.Searcher, error) {
                        return e.store, nil
-               }, series.ID)
+               }, common.SeriesID(id))
                if err != nil {
                        return nil, err
                }
diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go
index ad881e58..97e62459 100644
--- a/banyand/stream/merger_policy.go
+++ b/banyand/stream/merger_policy.go
@@ -37,14 +37,6 @@ func newDefaultMergePolicy() *mergePolicy {
        return newMergePolicy(15, 1.7, run.Bytes(math.MaxInt64))
 }
 
-func newDefaultMergePolicyForTesting() *mergePolicy {
-       return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
-}
-
-func newDisabledMergePolicyForTesting() *mergePolicy {
-       return newMergePolicy(0, 0, 0)
-}
-
 // NewMergePolicy creates a MergePolicy with given parameters.
 func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize 
run.Bytes) *mergePolicy {
        return &mergePolicy{
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index e375ae16..062663c8 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -18,10 +18,8 @@
 package stream
 
 import (
-       "container/heap"
        "context"
        "fmt"
-       "sort"
 
        "github.com/pkg/errors"
 
@@ -35,9 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/query"
        logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
@@ -57,15 +53,14 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        if db == nil {
                return nilResult, nil
        }
-       var result queryResult
        tsdb := db.(storage.TSDB[*tsTable, option])
-       result.segments = tsdb.SelectSegments(*sqo.TimeRange)
-       if len(result.segments) < 1 {
-               return &result, nil
+       segments := tsdb.SelectSegments(*sqo.TimeRange)
+       if len(segments) < 1 {
+               return bypassQueryResultInstance, nil
        }
        defer func() {
                if err != nil {
-                       result.Release()
+                       sqr.Release()
                }
        }()
        series := make([]*pbv1.Series, len(sqo.Entities))
@@ -75,7 +70,36 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                        EntityValues: sqo.Entities[i],
                }
        }
-       var seriesList, sl pbv1.SeriesList
+       qo := queryOptions{
+               StreamQueryOptions: sqo,
+               minTimestamp:       sqo.TimeRange.Start.UnixNano(),
+               maxTimestamp:       sqo.TimeRange.End.UnixNano(),
+       }
+
+       if sqo.Order == nil || sqo.Order.Index == nil {
+               result := &tsResult{
+                       segments: segments,
+                       series:   series,
+                       qo:       qo,
+                       sm:       s,
+               }
+               if sqo.Order == nil {
+                       result.asc = true
+               } else if sqo.Order.Sort == modelv1.Sort_SORT_ASC || 
sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED {
+                       result.asc = true
+               }
+               return result, nil
+       }
+       var result idxResult
+       result.segments = segments
+       result.sm = s
+       result.qo = queryOptions{
+               StreamQueryOptions: sqo,
+               minTimestamp:       sqo.TimeRange.Start.UnixNano(),
+               maxTimestamp:       sqo.TimeRange.End.UnixNano(),
+               seriesToEntity:     
make(map[common.SeriesID][]*modelv1.TagValue),
+       }
+       var sl pbv1.SeriesList
        seriesFilter := roaring.NewPostingList()
        for i := range result.segments {
                sl, err = result.segments[i].Lookup(ctx, series)
@@ -86,47 +110,20 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                        if seriesFilter.Contains(uint64(sl[j].ID)) {
                                continue
                        }
-                       seriesList = append(seriesList, sl[j])
                        seriesFilter.Insert(uint64(sl[j].ID))
+                       result.qo.seriesToEntity[sl[j].ID] = sl[j].EntityValues
                }
                result.tabs = append(result.tabs, 
result.segments[i].Tables()...)
        }
 
-       if len(seriesList) == 0 {
+       if seriesFilter.IsEmpty() {
                return &result, nil
        }
-       result.qo = queryOptions{
-               StreamQueryOptions: sqo,
-               minTimestamp:       sqo.TimeRange.Start.UnixNano(),
-               maxTimestamp:       sqo.TimeRange.End.UnixNano(),
-               seriesToEntity:     
make(map[common.SeriesID][]*modelv1.TagValue),
-       }
-       for i := range seriesList {
-               result.qo.seriesToEntity[seriesList[i].ID] = 
seriesList[i].EntityValues
-               result.qo.sortedSids = append(result.qo.sortedSids, 
seriesList[i].ID)
-       }
-       if result.qo.elementFilter, err = indexSearch(ctx, sqo, result.tabs, 
seriesList); err != nil {
+       sids := seriesFilter.ToSlice()
+       if result.qo.elementFilter, err = indexSearch(ctx, sqo, result.tabs, 
sids); err != nil {
                return nil, err
        }
-       result.tagNameIndex = make(map[string]partition.TagLocator)
-       result.schema = s.schema
-       for i, tagFamilySpec := range s.schema.GetTagFamilies() {
-               for j, tagSpec := range tagFamilySpec.GetTags() {
-                       result.tagNameIndex[tagSpec.GetName()] = 
partition.TagLocator{
-                               FamilyOffset: i,
-                               TagOffset:    j,
-                       }
-               }
-       }
-       if sqo.Order == nil {
-               result.orderByTS = true
-               result.asc = true
-               return &result, nil
-       }
-
-       if sqo.Order.Index == nil {
-               result.orderByTS = true
-       } else if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, 
seriesList); err != nil {
+       if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); 
err != nil {
                return nil, err
        }
        if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
@@ -144,411 +141,17 @@ type queryOptions struct {
        maxTimestamp int64
 }
 
-type queryResult struct {
-       sortingIter      itersort.Iterator[*index.DocumentResult]
-       tagNameIndex     map[string]partition.TagLocator
-       schema           *databasev1.Stream
-       tabs             []*tsTable
-       elementIDsSorted []uint64
-       data             []*blockCursor
-       snapshots        []*snapshot
-       segments         []storage.Segment[*tsTable, option]
-       qo               queryOptions
-       loaded           bool
-       orderByTS        bool
-       asc              bool
-}
-
-func (qr *queryResult) Pull(ctx context.Context) *model.StreamResult {
-       if qr.sortingIter == nil {
-               qo := qr.qo
-               sort.Slice(qo.sortedSids, func(i, j int) bool { return 
qo.sortedSids[i] < qo.sortedSids[j] })
-               return qr.load(ctx, qo)
-       }
-       if !qr.loaded {
-               qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize)
-               return qr.loadSortingData(ctx)
-       }
-       if v := qr.nextValue(); v != nil {
-               return v
-       }
-       qr.loaded = false
-       return qr.loadSortingData(ctx)
-}
-
-func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error {
-       var parts []*part
-       var n int
-       for i := range qr.tabs {
-               s := qr.tabs[i].currentSnapshot()
-               if s == nil {
-                       continue
-               }
-               parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
-               if n < 1 {
-                       s.decRef()
-                       continue
-               }
-               qr.snapshots = append(qr.snapshots, s)
-       }
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-       defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
-       defer defFn()
-       ti := generateTstIter()
-       defer releaseTstIter(ti)
-       sids := qo.sortedSids
-       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
-       if ti.Error() != nil {
-               return fmt.Errorf("cannot init tstIter: %w", ti.Error())
-       }
-       var hit int
-       for ti.nextBlock() {
-               if hit%checkDoneEvery == 0 {
-                       select {
-                       case <-ctx.Done():
-                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), 
len(ti.piHeap), len(ti.piPool))
-                       default:
-                       }
-               }
-               hit++
-               bc := generateBlockCursor()
-               p := ti.piHeap[0]
-               bc.init(p.p, p.curBlock, qo)
-               qr.data = append(qr.data, bc)
-       }
-       if ti.Error() != nil {
-               return fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
-       }
-       return nil
-}
-
-func (qr *queryResult) load(ctx context.Context, qo queryOptions) 
*model.StreamResult {
-       if !qr.loaded {
-               if err := qr.scanParts(ctx, qo); err != nil {
-                       return &model.StreamResult{
-                               Error: err,
-                       }
-               }
-               if len(qr.data) == 0 {
-                       return nil
-               }
-
-               cursorChan := make(chan int, len(qr.data))
-               for i := 0; i < len(qr.data); i++ {
-                       go func(i int) {
-                               select {
-                               case <-ctx.Done():
-                                       cursorChan <- i
-                                       return
-                               default:
-                               }
-                               tmpBlock := generateBlock()
-                               defer releaseBlock(tmpBlock)
-                               if !qr.data[i].loadData(tmpBlock) {
-                                       cursorChan <- i
-                                       return
-                               }
-                               if qr.schema.GetEntity() == nil || 
len(qr.schema.GetEntity().GetTagNames()) == 0 {
-                                       cursorChan <- -1
-                                       return
-                               }
-                               entityValues := 
qo.seriesToEntity[qr.data[i].bm.seriesID]
-                               entityMap := make(map[string]int)
-                               tagFamilyMap := make(map[string]int)
-                               for idx, entity := range 
qr.schema.GetEntity().GetTagNames() {
-                                       entityMap[entity] = idx + 1
-                               }
-                               for idx, tagFamily := range 
qr.data[i].tagFamilies {
-                                       tagFamilyMap[tagFamily.name] = idx + 1
-                               }
-                               for _, tagFamilyProj := range 
qr.data[i].tagProjection {
-                                       for j, tagProj := range 
tagFamilyProj.Names {
-                                               offset := 
qr.tagNameIndex[tagProj]
-                                               tagFamilySpec := 
qr.schema.GetTagFamilies()[offset.FamilyOffset]
-                                               tagSpec := 
tagFamilySpec.GetTags()[offset.TagOffset]
-                                               if tagSpec.IndexedOnly {
-                                                       continue
-                                               }
-                                               entityPos := entityMap[tagProj]
-                                               tagFamilyPos := 
tagFamilyMap[tagFamilyProj.Family]
-                                               if entityPos == 0 {
-                                                       continue
-                                               }
-                                               if tagFamilyPos == 0 {
-                                                       
qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{
-                                                               name: 
tagFamilyProj.Family,
-                                                               tags: 
make([]tag, 0),
-                                                       }
-                                               }
-                                               valueType := 
pbv1.MustTagValueToValueType(entityValues[entityPos-1])
-                                               
qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{
-                                                       name:      tagProj,
-                                                       values:    
mustEncodeTagValue(tagProj, tagSpec.GetType(), entityValues[entityPos-1], 
len(qr.data[i].timestamps)),
-                                                       valueType: valueType,
-                                               }
-                                       }
-                               }
-                               if qr.orderByTimestampDesc() {
-                                       qr.data[i].idx = 
len(qr.data[i].timestamps) - 1
-                               }
-                               cursorChan <- -1
-                       }(i)
-               }
-
-               blankCursorList := []int{}
-               for completed := 0; completed < len(qr.data); completed++ {
-                       result := <-cursorChan
-                       if result != -1 {
-                               blankCursorList = append(blankCursorList, 
result)
-                       }
-               }
-               select {
-               case <-ctx.Done():
-                       return &model.StreamResult{
-                               Error: errors.WithMessagef(ctx.Err(), 
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
-                       }
-               default:
-               }
-               sort.Slice(blankCursorList, func(i, j int) bool {
-                       return blankCursorList[i] > blankCursorList[j]
-               })
-               for _, index := range blankCursorList {
-                       releaseBlockCursor(qr.data[index])
-                       qr.data = append(qr.data[:index], qr.data[index+1:]...)
-               }
-               qr.loaded = true
-               heap.Init(qr)
-       }
-       return qr.nextValue()
-}
-
-func (qr *queryResult) nextValue() *model.StreamResult {
-       if len(qr.data) == 0 {
-               return nil
-       }
-       if !qr.orderByTS {
-               return qr.mergeByTagValue()
-       }
-       if len(qr.data) == 1 {
-               r := &model.StreamResult{}
-               bc := qr.data[0]
-               bc.copyAllTo(r, qr.orderByTimestampDesc())
-               qr.releaseBlockCursor()
-               return r
-       }
-       return qr.mergeByTimestamp()
-}
-
-func (qr *queryResult) loadSortingData(ctx context.Context) 
*model.StreamResult {
-       var qo queryOptions
-       qo.StreamQueryOptions = qr.qo.StreamQueryOptions
-       qo.elementFilter = roaring.NewPostingList()
-       qo.seriesToEntity = qr.qo.seriesToEntity
-       qr.elementIDsSorted = qr.elementIDsSorted[:0]
-       count, searchedSize := 1, 0
-       tracer := query.GetTracer(ctx)
-       if tracer != nil {
-               span, _ := tracer.StartSpan(ctx, "load-sorting-data")
-               span.Tagf("max_element_size", "%d", qo.MaxElementSize)
-               if qr.qo.elementFilter != nil {
-                       span.Tag("filter_size", fmt.Sprintf("%d", 
qr.qo.elementFilter.Len()))
-               }
-               defer func() {
-                       span.Tagf("searched_size", "%d", searchedSize)
-                       span.Tagf("count", "%d", count)
-                       span.Stop()
-               }()
-       }
-       for ; qr.sortingIter.Next(); count++ {
-               searchedSize++
-               val := qr.sortingIter.Val()
-               if qr.qo.elementFilter != nil && 
!qr.qo.elementFilter.Contains(val.DocID) {
-                       count--
-                       continue
-               }
-               qo.elementFilter.Insert(val.DocID)
-               if val.Timestamp > qo.maxTimestamp {
-                       qo.maxTimestamp = val.Timestamp
-               }
-               if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 {
-                       qo.minTimestamp = val.Timestamp
-               }
-               qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)
-
-               // Insertion sort
-               insertPos, found := -1, false
-               for i, sid := range qo.sortedSids {
-                       if val.SeriesID == sid {
-                               found = true
-                               break
-                       }
-                       if val.SeriesID < sid {
-                               insertPos = i
-                               break
-                       }
-               }
-
-               if !found {
-                       if insertPos == -1 {
-                               qo.sortedSids = append(qo.sortedSids, 
val.SeriesID)
-                       } else {
-                               qo.sortedSids = 
append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, 
qo.sortedSids[insertPos:]...)...)
-                       }
-               }
-               if count >= qo.MaxElementSize {
-                       break
-               }
-       }
-       if qo.elementFilter.IsEmpty() {
-               return nil
-       }
-       return qr.load(ctx, qo)
-}
-
-func (qr *queryResult) releaseParts() {
-       qr.releaseBlockCursor()
-       for i := range qr.snapshots {
-               qr.snapshots[i].decRef()
-       }
-       qr.snapshots = qr.snapshots[:0]
-}
-
-func (qr *queryResult) releaseBlockCursor() {
-       for i, v := range qr.data {
-               releaseBlockCursor(v)
-               qr.data[i] = nil
-       }
-       qr.data = qr.data[:0]
-}
-
-func (qr *queryResult) Release() {
-       qr.releaseParts()
-       for i := range qr.segments {
-               qr.segments[i].DecRef()
-       }
-}
-
-func (qr queryResult) Len() int {
-       return len(qr.data)
-}
-
-func (qr queryResult) Less(i, j int) bool {
-       leftIdx, rightIdx := qr.data[i].idx, qr.data[j].idx
-       leftTS := qr.data[i].timestamps[leftIdx]
-       rightTS := qr.data[j].timestamps[rightIdx]
-       if qr.asc {
-               return leftTS < rightTS
-       }
-       return leftTS > rightTS
-}
-
-func (qr queryResult) Swap(i, j int) {
-       qr.data[i], qr.data[j] = qr.data[j], qr.data[i]
-}
-
-func (qr *queryResult) Push(x interface{}) {
-       qr.data = append(qr.data, x.(*blockCursor))
-}
-
-func (qr *queryResult) Pop() interface{} {
-       old := qr.data
-       n := len(old)
-       x := old[n-1]
-       qr.data = old[0 : n-1]
-       releaseBlockCursor(x)
-       return x
-}
-
-func (qr *queryResult) orderByTimestampDesc() bool {
-       return qr.orderByTS && !qr.asc
-}
-
-func (qr *queryResult) mergeByTagValue() *model.StreamResult {
-       defer qr.releaseBlockCursor()
-       tmp := &model.StreamResult{}
-       prevIdx := 0
-       elementIDToIdx := make(map[uint64]int)
-       for _, data := range qr.data {
-               data.copyAllTo(tmp, false)
-               var idx int
-               for idx = prevIdx; idx < len(tmp.Timestamps); idx++ {
-                       elementIDToIdx[tmp.ElementIDs[idx]] = idx
-               }
-               prevIdx = idx
-       }
-
-       r := &model.StreamResult{
-               TagFamilies: []model.TagFamily{},
-       }
-       for _, tagFamily := range tmp.TagFamilies {
-               tf := model.TagFamily{
-                       Name: tagFamily.Name,
-                       Tags: []model.Tag{},
-               }
-               for _, tag := range tagFamily.Tags {
-                       t := model.Tag{
-                               Name:   tag.Name,
-                               Values: []*modelv1.TagValue{},
-                       }
-                       tf.Tags = append(tf.Tags, t)
-               }
-               r.TagFamilies = append(r.TagFamilies, tf)
-       }
-       for _, id := range qr.elementIDsSorted {
-               idx, ok := elementIDToIdx[id]
-               if !ok {
-                       continue
-               }
-               r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx])
-               r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx])
-               for i := 0; i < len(r.TagFamilies); i++ {
-                       for j := 0; j < len(r.TagFamilies[i].Tags); j++ {
-                               r.TagFamilies[i].Tags[j].Values = 
append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx])
-                       }
-               }
-       }
-       return r
-}
-
-func (qr *queryResult) mergeByTimestamp() *model.StreamResult {
-       step := 1
-       if qr.orderByTimestampDesc() {
-               step = -1
-       }
-       result := &model.StreamResult{}
-       var lastSid common.SeriesID
-
-       for qr.Len() > 0 {
-               topBC := qr.data[0]
-               if lastSid != 0 && topBC.bm.seriesID != lastSid {
-                       return result
-               }
-               lastSid = topBC.bm.seriesID
-
-               topBC.copyTo(result)
-               topBC.idx += step
-
-               if qr.orderByTimestampDesc() {
-                       if topBC.idx < 0 {
-                               heap.Pop(qr)
-                       } else {
-                               heap.Fix(qr, 0)
-                       }
-               } else {
-                       if topBC.idx >= len(topBC.timestamps) {
-                               heap.Pop(qr)
-                       } else {
-                               heap.Fix(qr, 0)
-                       }
-               }
-       }
-
-       return result
+func (qo *queryOptions) copyFrom(other *queryOptions) {
+       qo.StreamQueryOptions = other.StreamQueryOptions
+       qo.elementFilter = other.elementFilter
+       qo.seriesToEntity = other.seriesToEntity
+       qo.sortedSids = other.sortedSids
+       qo.minTimestamp = other.minTimestamp
+       qo.maxTimestamp = other.maxTimestamp
 }
 
 func indexSearch(ctx context.Context, sqo model.StreamQueryOptions,
-       tabs []*tsTable, seriesList pbv1.SeriesList,
+       tabs []*tsTable, seriesList []uint64,
 ) (posting.List, error) {
        if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
                return nil, nil
@@ -571,11 +174,15 @@ func indexSearch(ctx context.Context, sqo 
model.StreamQueryOptions,
 }
 
 func (s *stream) indexSort(ctx context.Context, sqo model.StreamQueryOptions, 
tabs []*tsTable,
-       seriesList pbv1.SeriesList,
+       sids []uint64,
 ) (itersort.Iterator[*index.DocumentResult], error) {
        if sqo.Order == nil || sqo.Order.Index == nil {
                return nil, nil
        }
+       seriesList := make([]common.SeriesID, len(sids))
+       for i := range sids {
+               seriesList[i] = common.SeriesID(sids[i])
+       }
        iters, err := s.buildItersByIndex(ctx, tabs, seriesList, sqo)
        if err != nil {
                return nil, err
@@ -585,13 +192,12 @@ func (s *stream) indexSort(ctx context.Context, sqo 
model.StreamQueryOptions, ta
 }
 
 func (s *stream) buildItersByIndex(ctx context.Context, tables []*tsTable,
-       seriesList pbv1.SeriesList, sqo model.StreamQueryOptions,
+       sids []common.SeriesID, sqo model.StreamQueryOptions,
 ) (iters []itersort.Iterator[*index.DocumentResult], err error) {
        indexRuleForSorting := sqo.Order.Index
        if len(indexRuleForSorting.Tags) != 1 {
                return nil, fmt.Errorf("only support one tag for sorting, but 
got %d", len(indexRuleForSorting.Tags))
        }
-       sids := seriesList.IDs()
        for _, tw := range tables {
                var iter index.FieldIterator[*index.DocumentResult]
                fieldKey := index.FieldKey{
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
new file mode 100644
index 00000000..106e556c
--- /dev/null
+++ b/banyand/stream/query_by_idx.go
@@ -0,0 +1,321 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "context"
+       "fmt"
+       "sort"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+       itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       "github.com/apache/skywalking-banyandb/pkg/query"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+type idxResult struct {
+       sortingIter      itersort.Iterator[*index.DocumentResult]
+       sm               *stream
+       tabs             []*tsTable
+       elementIDsSorted []uint64
+       data             []*blockCursor
+       snapshots        []*snapshot
+       segments         []storage.Segment[*tsTable, option]
+       qo               queryOptions
+       loaded           bool
+       asc              bool
+}
+
+func (qr *idxResult) Pull(ctx context.Context) *model.StreamResult {
+       if !qr.loaded {
+               qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize)
+               return qr.loadSortingData(ctx)
+       }
+       if v := qr.nextValue(); v != nil {
+               return v
+       }
+       qr.loaded = false
+       return qr.loadSortingData(ctx)
+}
+
+func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error {
+       var parts []*part
+       var n int
+       for i := range qr.tabs {
+               s := qr.tabs[i].currentSnapshot()
+               if s == nil {
+                       continue
+               }
+               parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
+               if n < 1 {
+                       s.decRef()
+                       continue
+               }
+               qr.snapshots = append(qr.snapshots, s)
+       }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
+       defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
+       defer defFn()
+       ti := generateTstIter()
+       defer releaseTstIter(ti)
+       sids := qo.sortedSids
+       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       if ti.Error() != nil {
+               return fmt.Errorf("cannot init tstIter: %w", ti.Error())
+       }
+       var hit int
+       for ti.nextBlock() {
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), 
len(ti.piHeap), len(ti.piPool))
+                       default:
+                       }
+               }
+               hit++
+               bc := generateBlockCursor()
+               p := ti.piHeap[0]
+               bc.init(p.p, p.curBlock, qo)
+               qr.data = append(qr.data, bc)
+       }
+       if ti.Error() != nil {
+               return fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
+       }
+       return nil
+}
+
+func (qr *idxResult) load(ctx context.Context, qo queryOptions) 
*model.StreamResult {
+       if qr.loaded {
+               qr.nextValue()
+       }
+       if err := qr.scanParts(ctx, qo); err != nil {
+               return &model.StreamResult{
+                       Error: err,
+               }
+       }
+       if len(qr.data) == 0 {
+               return nil
+       }
+
+       cursorChan := make(chan int, len(qr.data))
+       for i := 0; i < len(qr.data); i++ {
+               go func(i int) {
+                       select {
+                       case <-ctx.Done():
+                               cursorChan <- i
+                               return
+                       default:
+                       }
+                       if qr.sm.schema.GetEntity() == nil || 
len(qr.sm.schema.GetEntity().GetTagNames()) == 0 {
+                               cursorChan <- -1
+                               return
+                       }
+                       tmpBlock := generateBlock()
+                       defer releaseBlock(tmpBlock)
+                       if loadBlockCursor(qr.data[i], tmpBlock, qo, qr.sm) {
+                               cursorChan <- -1
+                               return
+                       }
+                       cursorChan <- i
+               }(i)
+       }
+
+       blankCursorList := []int{}
+       for completed := 0; completed < len(qr.data); completed++ {
+               result := <-cursorChan
+               if result != -1 {
+                       blankCursorList = append(blankCursorList, result)
+               }
+       }
+       select {
+       case <-ctx.Done():
+               return &model.StreamResult{
+                       Error: errors.WithMessagef(ctx.Err(), "interrupt: 
blank/total=%d/%d", len(blankCursorList), len(qr.data)),
+               }
+       default:
+       }
+       sort.Slice(blankCursorList, func(i, j int) bool {
+               return blankCursorList[i] > blankCursorList[j]
+       })
+       for _, index := range blankCursorList {
+               releaseBlockCursor(qr.data[index])
+               qr.data = append(qr.data[:index], qr.data[index+1:]...)
+       }
+       qr.loaded = true
+       return qr.nextValue()
+}
+
+func (qr *idxResult) nextValue() *model.StreamResult {
+       if len(qr.data) == 0 {
+               return nil
+       }
+       return qr.mergeByTagValue()
+}
+
+func (qr *idxResult) loadSortingData(ctx context.Context) *model.StreamResult {
+       var qo queryOptions
+       qo.StreamQueryOptions = qr.qo.StreamQueryOptions
+       qo.elementFilter = roaring.NewPostingList()
+       qo.seriesToEntity = qr.qo.seriesToEntity
+       qr.elementIDsSorted = qr.elementIDsSorted[:0]
+       count, searchedSize := 1, 0
+       tracer := query.GetTracer(ctx)
+       if tracer != nil {
+               span, _ := tracer.StartSpan(ctx, "load-sorting-data")
+               span.Tagf("max_element_size", "%d", qo.MaxElementSize)
+               if qr.qo.elementFilter != nil {
+                       span.Tag("filter_size", fmt.Sprintf("%d", 
qr.qo.elementFilter.Len()))
+               }
+               defer func() {
+                       span.Tagf("searched_size", "%d", searchedSize)
+                       span.Tagf("count", "%d", count)
+                       span.Stop()
+               }()
+       }
+       for ; qr.sortingIter.Next(); count++ {
+               searchedSize++
+               val := qr.sortingIter.Val()
+               if qr.qo.elementFilter != nil && 
!qr.qo.elementFilter.Contains(val.DocID) {
+                       count--
+                       continue
+               }
+               qo.elementFilter.Insert(val.DocID)
+               if val.Timestamp > qo.maxTimestamp {
+                       qo.maxTimestamp = val.Timestamp
+               }
+               if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 {
+                       qo.minTimestamp = val.Timestamp
+               }
+               qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)
+
+               // Insertion sort
+               insertPos, found := -1, false
+               for i, sid := range qo.sortedSids {
+                       if val.SeriesID == sid {
+                               found = true
+                               break
+                       }
+                       if val.SeriesID < sid {
+                               insertPos = i
+                               break
+                       }
+               }
+
+               if !found {
+                       if insertPos == -1 {
+                               qo.sortedSids = append(qo.sortedSids, 
val.SeriesID)
+                       } else {
+                               qo.sortedSids = 
append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, 
qo.sortedSids[insertPos:]...)...)
+                       }
+               }
+               if count >= qo.MaxElementSize {
+                       break
+               }
+       }
+       if qo.elementFilter.IsEmpty() {
+               return nil
+       }
+       return qr.load(ctx, qo)
+}
+
+func (qr *idxResult) releaseParts() {
+       qr.releaseBlockCursor()
+       for i := range qr.snapshots {
+               qr.snapshots[i].decRef()
+       }
+       qr.snapshots = qr.snapshots[:0]
+}
+
+func (qr *idxResult) releaseBlockCursor() {
+       for i, v := range qr.data {
+               releaseBlockCursor(v)
+               qr.data[i] = nil
+       }
+       qr.data = qr.data[:0]
+}
+
+func (qr *idxResult) Release() {
+       qr.releaseParts()
+       for i := range qr.segments {
+               qr.segments[i].DecRef()
+       }
+}
+
+func (qr *idxResult) mergeByTagValue() *model.StreamResult {
+       defer qr.releaseBlockCursor()
+       tmp := &model.StreamResult{}
+       prevIdx := 0
+       elementIDToIdx := make(map[uint64]int)
+       for _, data := range qr.data {
+               data.copyAllTo(tmp, false)
+               var idx int
+               for idx = prevIdx; idx < len(tmp.Timestamps); idx++ {
+                       elementIDToIdx[tmp.ElementIDs[idx]] = idx
+               }
+               prevIdx = idx
+       }
+
+       r := &model.StreamResult{
+               TagFamilies: []model.TagFamily{},
+       }
+       for _, tagFamily := range tmp.TagFamilies {
+               tf := model.TagFamily{
+                       Name: tagFamily.Name,
+                       Tags: []model.Tag{},
+               }
+               for _, tag := range tagFamily.Tags {
+                       t := model.Tag{
+                               Name:   tag.Name,
+                               Values: []*modelv1.TagValue{},
+                       }
+                       tf.Tags = append(tf.Tags, t)
+               }
+               r.TagFamilies = append(r.TagFamilies, tf)
+       }
+       for _, id := range qr.elementIDsSorted {
+               idx, ok := elementIDToIdx[id]
+               if !ok {
+                       continue
+               }
+               r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx])
+               r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx])
+               for i := 0; i < len(r.TagFamilies); i++ {
+                       for j := 0; j < len(r.TagFamilies[i].Tags); j++ {
+                               r.TagFamilies[i].Tags[j].Values = 
append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx])
+                       }
+               }
+       }
+       return r
+}
+
+var bypassQueryResultInstance = &bypassQueryResult{}
+
+type bypassQueryResult struct{}
+
+func (bypassQueryResult) Pull(context.Context) *model.StreamResult {
+       return nil
+}
+
+func (bypassQueryResult) Release() {}
diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go
new file mode 100644
index 00000000..0c9ccb11
--- /dev/null
+++ b/banyand/stream/query_by_ts.go
@@ -0,0 +1,265 @@
+package stream
+
+import (
+       "container/heap"
+       "context"
+       "sync"
+
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+var _ model.StreamQueryResult = (*tsResult)(nil)
+
+type tsResult struct {
+       sm       *stream
+       segments []storage.Segment[*tsTable, option]
+       series   []*pbv1.Series
+       shards   []*model.StreamResult
+       qo       queryOptions
+       asc      bool
+}
+
+func (t *tsResult) Pull(ctx context.Context) *model.StreamResult {
+       if len(t.segments) == 0 {
+               return &model.StreamResult{}
+       }
+       if err := t.scanSegment(ctx); err != nil {
+               return &model.StreamResult{Error: err}
+       }
+       var err error
+       for i := range t.shards {
+               if t.shards[i].Error != nil {
+                       err = multierr.Append(err, t.shards[i].Error)
+               }
+       }
+       if err != nil {
+               return &model.StreamResult{Error: err}
+       }
+       return model.MergeStreamResults(t.shards, t.qo.MaxElementSize, t.asc)
+}
+
+func (t *tsResult) scanSegment(ctx context.Context) error {
+       var segment storage.Segment[*tsTable, option]
+       if t.asc {
+               segment = t.segments[len(t.segments)-1]
+               t.segments = t.segments[:len(t.segments)-1]
+       } else {
+               segment = t.segments[0]
+               t.segments = t.segments[1:]
+       }
+
+       bs := blockScanner{
+               segment: segment,
+               qo:      t.qo,
+               series:  t.series,
+       }
+       defer bs.close()
+       if err := bs.searchSeries(ctx); err != nil {
+               return err
+       }
+       workerSize := cgroups.CPUs()
+       var workerWg sync.WaitGroup
+       batchCh := make(chan *blockScanResultBatch, workerSize)
+       workerWg.Add(workerSize)
+       if t.shards == nil {
+               t.shards = make([]*model.StreamResult, workerSize)
+               for i := range t.shards {
+                       t.shards[i] = 
model.NewStreamResult(t.qo.MaxElementSize, t.asc)
+               }
+       } else {
+               for i := range t.shards {
+                       t.shards[i].Reset()
+               }
+       }
+       for i := 0; i < workerSize; i++ {
+               go func(workerID int) {
+                       tmpBlock := generateBlock()
+                       defer releaseBlock(tmpBlock)
+                       blockHeap := generateBlockCursorHeap(t.asc)
+                       defer releaseBlockCursorHeap(blockHeap)
+                       tmpResult := model.NewStreamResult(t.qo.MaxElementSize, 
t.asc)
+                       for batch := range batchCh {
+                               if batch.err != nil {
+                                       t.shards[workerID].Error = batch.err
+                                       releaseBlockScanResultBatch(batch)
+                                       continue
+                               }
+                               for _, bs := range batch.bss {
+                                       bc := generateBlockCursor()
+                                       bc.init(bs.p, &bs.bm, bs.qo)
+                                       if loadBlockCursor(bc, tmpBlock, bs.qo, 
t.sm) {
+                                               if !t.asc {
+                                                       bc.idx = 
len(bc.timestamps) - 1
+                                               }
+                                               blockHeap.Push(bc)
+                                       }
+                               }
+                               releaseBlockScanResultBatch(batch)
+                               heap.Init(blockHeap)
+                               result := blockHeap.merge(t.qo.MaxElementSize)
+                               t.shards[workerID].CopyFrom(tmpResult, result)
+                               blockHeap.reset()
+                       }
+                       workerWg.Done()
+               }(i)
+       }
+
+       var scannerWg sync.WaitGroup
+       finalizers := bs.scanShardsInParallel(ctx, &scannerWg, batchCh)
+       scannerWg.Wait()
+       close(batchCh)
+       workerWg.Wait()
+       for i := range finalizers {
+               if finalizers[i] != nil {
+                       finalizers[i]()
+               }
+       }
+       return nil
+}
+
+func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm 
*stream) bool {
+       tmpBlock.reset()
+       if !bc.loadData(tmpBlock) {
+               releaseBlockCursor(bc)
+               return false
+       }
+       entityValues := qo.seriesToEntity[bc.bm.seriesID]
+
+       tagFamilyMap := make(map[string]int)
+       for idx, tagFamily := range bc.tagFamilies {
+               tagFamilyMap[tagFamily.name] = idx + 1
+       }
+       for _, tagFamilyProj := range bc.tagProjection {
+               for j, tagProj := range tagFamilyProj.Names {
+                       tagSpec := sm.tagMap[tagProj]
+                       if tagSpec.IndexedOnly {
+                               continue
+                       }
+                       entityPos := sm.entityMap[tagProj]
+                       if entityPos == 0 {
+                               continue
+                       }
+                       tagFamilyPos := tagFamilyMap[tagFamilyProj.Family]
+                       if tagFamilyPos == 0 {
+                               bc.tagFamilies[tagFamilyPos-1] = tagFamily{
+                                       name: tagFamilyProj.Family,
+                                       tags: make([]tag, 0),
+                               }
+                       }
+                       valueType := 
pbv1.MustTagValueToValueType(entityValues[entityPos-1])
+                       bc.tagFamilies[tagFamilyPos-1].tags[j] = tag{
+                               name:      tagProj,
+                               values:    mustEncodeTagValue(tagProj, 
tagSpec.GetType(), entityValues[entityPos-1], len(bc.timestamps)),
+                               valueType: valueType,
+                       }
+               }
+       }
+       return true
+}
+
+func (t *tsResult) Release() {
+       for i := range t.segments {
+               t.segments[i].DecRef()
+       }
+}
+
+type blockCursorHeap struct {
+       bcc []*blockCursor
+       asc bool
+}
+
+func (bch blockCursorHeap) Len() int {
+       return len(bch.bcc)
+}
+
+func (bch blockCursorHeap) Less(i, j int) bool {
+       leftIdx, rightIdx := bch.bcc[i].idx, bch.bcc[j].idx
+       leftTS := bch.bcc[i].timestamps[leftIdx]
+       rightTS := bch.bcc[j].timestamps[rightIdx]
+       if bch.asc {
+               return leftTS < rightTS
+       }
+       return leftTS > rightTS
+}
+
+func (bch *blockCursorHeap) Swap(i, j int) {
+       bch.bcc[i], bch.bcc[j] = bch.bcc[j], bch.bcc[i]
+}
+
+func (bch *blockCursorHeap) Push(x interface{}) {
+       bch.bcc = append(bch.bcc, x.(*blockCursor))
+}
+
+func (bch *blockCursorHeap) Pop() interface{} {
+       old := bch.bcc
+       n := len(old)
+       x := old[n-1]
+       bch.bcc = old[0 : n-1]
+       releaseBlockCursor(x)
+       return x
+}
+
+func (bch *blockCursorHeap) reset() {
+       for i := range bch.bcc {
+               releaseBlockCursor(bch.bcc[i])
+       }
+       bch.bcc = bch.bcc[:0]
+}
+
+func (bch *blockCursorHeap) merge(limit int) *model.StreamResult {
+       step := -1
+       if bch.asc {
+               step = 1
+       }
+       result := &model.StreamResult{}
+
+       for bch.Len() > 0 {
+               topBC := bch.bcc[0]
+               topBC.copyTo(result)
+               if result.Len() >= limit {
+                       break
+               }
+               topBC.idx += step
+
+               if bch.asc {
+                       if topBC.idx >= len(topBC.timestamps) {
+                               heap.Pop(bch)
+                       } else {
+                               heap.Fix(bch, 0)
+                       }
+               } else {
+                       if topBC.idx < 0 {
+                               heap.Pop(bch)
+                       } else {
+                               heap.Fix(bch, 0)
+                       }
+               }
+       }
+
+       return result
+}
+
+var blockCursorHeapPool = 
pool.Register[*blockCursorHeap]("stream-blockCursorHeap")
+
+func generateBlockCursorHeap(asc bool) *blockCursorHeap {
+       v := blockCursorHeapPool.Get()
+       if v == nil {
+               return &blockCursorHeap{
+                       asc: asc,
+                       bcc: make([]*blockCursor, 0, blockScannerBatchSize),
+               }
+       }
+       v.asc = asc
+       return v
+}
+
+func releaseBlockCursorHeap(bch *blockCursorHeap) {
+       bch.reset()
+       blockCursorHeapPool.Put(bch)
+}
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
deleted file mode 100644
index 9eea344d..00000000
--- a/banyand/stream/query_test.go
+++ /dev/null
@@ -1,424 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package stream
-
-import (
-       "context"
-       "errors"
-       "sort"
-       "testing"
-       "time"
-
-       "github.com/google/go-cmp/cmp"
-       "github.com/stretchr/testify/require"
-       "google.golang.org/protobuf/testing/protocmp"
-
-       "github.com/apache/skywalking-banyandb/api/common"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/fs"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/query/model"
-       "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/test"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-       "github.com/apache/skywalking-banyandb/pkg/watcher"
-)
-
-func TestQueryResult(t *testing.T) {
-       tests := []struct {
-               wantErr       error
-               name          string
-               esList        []*elements
-               sids          []common.SeriesID
-               want          []model.StreamResult
-               minTimestamp  int64
-               maxTimestamp  int64
-               orderBySeries bool
-               ascTS         bool
-       }{
-               {
-                       name:         "Test with multiple parts with duplicated 
data order by TS",
-                       esList:       []*elements{esTS1, esTS1},
-                       sids:         []common.SeriesID{1, 2, 3},
-                       minTimestamp: 1,
-                       maxTimestamp: 1,
-                       want: []model.StreamResult{{
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{1},
-                               ElementIDs: []uint64{11},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:        []common.SeriesID{3, 3},
-                               Timestamps:  []int64{1, 1},
-                               ElementIDs:  []uint64{31, 31},
-                               TagFamilies: emptyTagFamilies(2),
-                       }, {
-                               SIDs:       []common.SeriesID{2, 2},
-                               Timestamps: []int64{1, 1},
-                               ElementIDs: []uint64{21, 21},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag1")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag2")}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{1},
-                               ElementIDs: []uint64{11},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }},
-               },
-               {
-                       name:         "Test with multiple parts with multiple 
data orderBy TS desc",
-                       esList:       []*elements{esTS1, esTS2},
-                       sids:         []common.SeriesID{1, 2, 3},
-                       minTimestamp: 1,
-                       maxTimestamp: 2,
-                       want: []model.StreamResult{{
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{2},
-                               ElementIDs: []uint64{12},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value3")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(30)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:       []common.SeriesID{2},
-                               Timestamps: []int64{2},
-                               ElementIDs: []uint64{22},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag3")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag4")}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:        []common.SeriesID{3},
-                               Timestamps:  []int64{2},
-                               ElementIDs:  []uint64{32},
-                               TagFamilies: emptyTagFamilies(1),
-                       }, {
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{1},
-                               ElementIDs: []uint64{11},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:        []common.SeriesID{3},
-                               Timestamps:  []int64{1},
-                               ElementIDs:  []uint64{31},
-                               TagFamilies: emptyTagFamilies(1),
-                       }, {
-                               SIDs:       []common.SeriesID{2},
-                               Timestamps: []int64{1},
-                               ElementIDs: []uint64{21},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag1")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag2")}},
-                                       }},
-                               },
-                       }},
-               },
-               {
-                       name:         "Test with multiple parts with multiple 
data orderBy TS asc",
-                       esList:       []*elements{esTS1, esTS2},
-                       sids:         []common.SeriesID{1, 2, 3},
-                       ascTS:        true,
-                       minTimestamp: 1,
-                       maxTimestamp: 2,
-                       want: []model.StreamResult{{
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{1},
-                               ElementIDs: []uint64{11},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:        []common.SeriesID{3},
-                               Timestamps:  []int64{1},
-                               ElementIDs:  []uint64{31},
-                               TagFamilies: emptyTagFamilies(1),
-                       }, {
-                               SIDs:       []common.SeriesID{2, 2},
-                               Timestamps: []int64{1, 2},
-                               ElementIDs: []uint64{21, 22},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue, pbv1.NullTagValue}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag3")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag4")}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:       []common.SeriesID{1},
-                               Timestamps: []int64{2},
-                               ElementIDs: []uint64{12},
-                               TagFamilies: []model.TagFamily{
-                                       {Name: "arrTag", Tags: []model.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value5", "value6"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{35, 40})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []model.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []model.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value3")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(30)}},
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
-                                       }},
-                               },
-                       }, {
-                               SIDs:        []common.SeriesID{3},
-                               Timestamps:  []int64{2},
-                               ElementIDs:  []uint64{32},
-                               TagFamilies: emptyTagFamilies(1),
-                       }},
-               },
-       }
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       verify := func(t *testing.T, tst *tsTable) {
-                               var result queryResult
-
-                               result.qo = queryOptions{
-                                       minTimestamp: tt.minTimestamp,
-                                       maxTimestamp: tt.maxTimestamp,
-                                       sortedSids:   tt.sids,
-                                       StreamQueryOptions: 
model.StreamQueryOptions{
-                                               TagProjection: tagProjectionAll,
-                                       },
-                               }
-                               result.tabs = []*tsTable{tst}
-                               defer result.Release()
-                               if !tt.orderBySeries {
-                                       result.orderByTS = true
-                                       result.asc = tt.ascTS
-                               }
-                               var got []model.StreamResult
-                               ctx := context.Background()
-                               for {
-                                       r := result.Pull(ctx)
-                                       if r == nil {
-                                               break
-                                       }
-                                       if !errors.Is(r.Error, tt.wantErr) {
-                                               t.Errorf("Unexpected error: got 
%v, want %v", r.Error, tt.wantErr)
-                                       }
-                                       sort.Slice(r.TagFamilies, func(i, j 
int) bool {
-                                               return r.TagFamilies[i].Name < 
r.TagFamilies[j].Name
-                                       })
-                                       got = append(got, *r)
-                               }
-
-                               if diff := cmp.Diff(got, tt.want,
-                                       protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
-                                       t.Errorf("Unexpected []pbv1.Result 
(-got +want):\n%s", diff)
-                               }
-                       }
-
-                       t.Run("memory snapshot", func(t *testing.T) {
-                               tmpPath, defFn := test.Space(require.New(t))
-                               defer defFn()
-                               index, _ := newElementIndex(context.TODO(), 
tmpPath, 0, nil)
-                               tst := &tsTable{
-                                       index:         index,
-                                       loopCloser:    run.NewCloser(2),
-                                       introductions: make(chan *introduction),
-                                       fileSystem:    fs.NewLocalFileSystem(),
-                                       root:          tmpPath,
-                               }
-                               tst.gc.init(tst)
-                               flushCh := make(chan *flusherIntroduction)
-                               mergeCh := make(chan *mergerIntroduction)
-                               introducerWatcher := make(watcher.Channel, 1)
-                               go tst.introducerLoop(flushCh, mergeCh, 
introducerWatcher, 1)
-                               for _, es := range tt.esList {
-                                       tst.mustAddElements(es)
-                                       time.Sleep(100 * time.Millisecond)
-                               }
-                               verify(t, tst)
-                       })
-
-                       t.Run("file snapshot", func(t *testing.T) {
-                               // Initialize a tstIter object.
-                               tmpPath, defFn := test.Space(require.New(t))
-                               fileSystem := fs.NewLocalFileSystem()
-                               defer defFn()
-                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{},
-                                       // Since Stream deduplicate data in 
merging process, we need to disable the merging in the test.
-                                       logger.GetLogger("test"), 
timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: 
newDisabledMergePolicyForTesting()}, nil)
-                               require.NoError(t, err)
-                               for _, es := range tt.esList {
-                                       tst.mustAddElements(es)
-                                       time.Sleep(100 * time.Millisecond)
-                               }
-                               // wait until the introducer is done
-                               if len(tt.esList) > 0 {
-                                       for {
-                                               snp := tst.currentSnapshot()
-                                               if snp == nil {
-                                                       time.Sleep(100 * 
time.Millisecond)
-                                                       continue
-                                               }
-                                               if snp.creator != 
snapshotCreatorMemPart && len(snp.parts) == len(tt.esList) {
-                                                       snp.decRef()
-                                                       tst.Close()
-                                                       break
-                                               }
-                                               snp.decRef()
-                                               time.Sleep(100 * 
time.Millisecond)
-                                       }
-                               }
-
-                               // reopen the table
-                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{},
-                                       logger.GetLogger("test"), 
timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: 
newDefaultMergePolicyForTesting()}, nil)
-                               require.NoError(t, err)
-
-                               verify(t, tst)
-                       })
-               })
-       }
-}
-
-func emptyTagFamilies(size int) []model.TagFamily {
-       var values []*modelv1.TagValue
-       for i := 0; i < size; i++ {
-               values = append(values, pbv1.NullTagValue)
-       }
-       return []model.TagFamily{
-               {Name: "arrTag", Tags: []model.Tag{
-                       {Name: "strArrTag", Values: values},
-                       {Name: "intArrTag", Values: values},
-               }},
-               {Name: "binaryTag", Tags: []model.Tag{
-                       {Name: "binaryTag", Values: values},
-               }},
-               {Name: "singleTag", Tags: []model.Tag{
-                       {Name: "strTag", Values: values},
-                       {Name: "intTag", Values: values},
-                       {Name: "strTag1", Values: values},
-                       {Name: "strTag2", Values: values},
-               }},
-       }
-}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 406e9bc4..9e4ebaa7 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -70,10 +70,12 @@ type stream struct {
        databaseSupplier  schema.Supplier
        l                 *logger.Logger
        schema            *databasev1.Stream
+       tagMap            map[string]*databasev1.TagSpec
+       entityMap         map[string]int
        name              string
        group             string
-       indexRules        []*databasev1.IndexRule
        indexRuleLocators partition.IndexRuleLocator
+       indexRules        []*databasev1.IndexRule
        shardNum          uint32
 }
 
@@ -92,6 +94,16 @@ func (s *stream) Close() error {
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
        s.indexRuleLocators, _ = 
partition.ParseIndexRuleLocators(s.schema.GetEntity(), 
s.schema.GetTagFamilies(), s.indexRules, false)
+       s.tagMap = make(map[string]*databasev1.TagSpec)
+       for _, tf := range s.schema.GetTagFamilies() {
+               for _, tag := range tf.GetTags() {
+                       s.tagMap[tag.GetName()] = tag
+               }
+       }
+       s.entityMap = make(map[string]int)
+       for idx, entity := range s.schema.GetEntity().GetTagNames() {
+               s.entityMap[entity] = idx + 1
+       }
 }
 
 type streamSpec struct {
diff --git a/banyand/stream/trace.go b/banyand/stream/trace.go
index ed428a67..5bcddecc 100644
--- a/banyand/stream/trace.go
+++ b/banyand/stream/trace.go
@@ -50,7 +50,7 @@ func (bc *blockCursor) String() string {
                bc.p.partMetadata.ID, bc.bm.seriesID, minTimestamp, 
maxTimestamp, bc.bm.count, humanize.Bytes(bc.bm.uncompressedSizeBytes))
 }
 
-func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr 
*queryResult) func() {
+func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr 
*idxResult) func() {
        tracer := query.GetTracer(ctx)
        if tracer == nil {
                return func() {}
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index 42e2a6eb..3945f0bb 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -32,7 +32,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -251,12 +250,6 @@ func Test_tstIter(t *testing.T) {
        })
 }
 
-var tagProjectionAll = []model.TagProjection{
-       {Family: "arrTag", Names: []string{"strArrTag", "intArrTag"}},
-       {Family: "binaryTag", Names: []string{"binaryTag"}},
-       {Family: "singleTag", Names: []string{"strTag", "intTag", "strTag1", 
"strTag2"}},
-}
-
 var esTS1 = &elements{
        seriesIDs:  []common.SeriesID{1, 2, 3},
        timestamps: []int64{1, 1, 1},
diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go
index b9c6fa80..f13fc161 100644
--- a/pkg/pb/v1/value.go
+++ b/pkg/pb/v1/value.go
@@ -130,7 +130,7 @@ func marshalTagValue(dest []byte, tv *modelv1.TagValue) 
([]byte, error) {
        case *modelv1.TagValue_Null:
                dest = marshalEntityValue(dest, nil)
        case *modelv1.TagValue_Str:
-               dest = marshalEntityValue(dest, 
convert.StringToBytes(tv.GetStr().Value))
+               dest = marshalEntityValue(dest, []byte(tv.GetStr().Value))
        case *modelv1.TagValue_Int:
                dest = marshalEntityValue(dest, encoding.Int64ToBytes(nil, 
tv.GetInt().Value))
        case *modelv1.TagValue_BinaryData:
diff --git a/pkg/query/logical/optimizer.go b/pkg/query/logical/optimizer.go
index 471f78b8..0c793d8d 100644
--- a/pkg/query/logical/optimizer.go
+++ b/pkg/query/logical/optimizer.go
@@ -78,7 +78,7 @@ func (pdo PushDownOrder) Optimize(plan Plan) (Plan, error) {
                        pdo.order.GetIndexRuleName(), pdo.order.GetSort()); err 
== nil && order != nil {
                        v.Sort(order)
                } else {
-                       return nil, err
+                       return plan, err
                }
        }
        return plan, nil
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 768e6fad..e40f2581 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -19,11 +19,13 @@
 package model
 
 import (
+       "container/heap"
        "context"
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -96,6 +98,162 @@ type StreamResult struct {
        ElementIDs  []uint64
        TagFamilies []TagFamily
        SIDs        []common.SeriesID
+       topN        int
+       idx         int
+       asc         bool
+}
+
+// NewStreamResult creates a new StreamResult.
+func NewStreamResult(topN int, asc bool) *StreamResult {
+       return &StreamResult{
+               topN:        topN,
+               asc:         asc,
+               Timestamps:  make([]int64, 0, topN),
+               ElementIDs:  make([]uint64, 0, topN),
+               TagFamilies: make([]TagFamily, 0, topN),
+               SIDs:        make([]common.SeriesID, 0, topN),
+       }
+}
+
+// Len returns the length of the StreamResult.
+func (sr *StreamResult) Len() int {
+       return len(sr.Timestamps)
+}
+
+// Reset resets the StreamResult.
+func (sr *StreamResult) Reset() {
+       sr.Error = nil
+       sr.idx = 0
+       sr.Timestamps = sr.Timestamps[:0]
+       sr.ElementIDs = sr.ElementIDs[:0]
+       sr.TagFamilies = sr.TagFamilies[:0]
+       sr.SIDs = sr.SIDs[:0]
+}
+
+// CopyFrom copies the topN results from other to sr using tmp as a temporary 
result.
+func (sr *StreamResult) CopyFrom(tmp, other *StreamResult) bool {
+       // Prepare a reusable tmp result
+       tmp.Reset()
+       tmp.topN = sr.topN
+       tmp.asc = sr.asc
+
+       // Prepare heaps
+       sr.idx = 0
+       other.idx = 0
+
+       h := &StreamResultHeap{asc: sr.asc}
+       heap.Init(h)
+
+       if sr.Len() > 0 {
+               heap.Push(h, sr)
+       }
+       if other.Len() > 0 {
+               heap.Push(h, other)
+       }
+
+       // Pop from heap to build tmp with topN
+       for h.Len() > 0 && tmp.Len() < tmp.topN {
+               res := heap.Pop(h).(*StreamResult)
+               tmp.CopySingleFrom(res)
+               res.idx++
+               if res.idx < res.Len() {
+                       heap.Push(h, res)
+               }
+       }
+
+       // Copy tmp back to sr
+       sr.Reset()
+       sr.Timestamps = append(sr.Timestamps, tmp.Timestamps...)
+       sr.ElementIDs = append(sr.ElementIDs, tmp.ElementIDs...)
+       sr.SIDs = append(sr.SIDs, tmp.SIDs...)
+       sr.TagFamilies = append(sr.TagFamilies, tmp.TagFamilies...)
+
+       return len(sr.Timestamps) >= sr.topN
+}
+
+// CopySingleFrom copies a single result from other to sr.
+func (sr *StreamResult) CopySingleFrom(other *StreamResult) {
+       sr.SIDs = append(sr.SIDs, other.SIDs[other.idx])
+       sr.Timestamps = append(sr.Timestamps, other.Timestamps[other.idx])
+       sr.ElementIDs = append(sr.ElementIDs, other.ElementIDs[other.idx])
+       if len(sr.TagFamilies) < len(other.TagFamilies) {
+               for i := range other.TagFamilies {
+                       tf := TagFamily{
+                               Name: other.TagFamilies[i].Name,
+                               Tags: make([]Tag, 
len(other.TagFamilies[i].Tags)),
+                       }
+                       for j := range tf.Tags {
+                               tf.Tags[j].Name = 
other.TagFamilies[i].Tags[j].Name
+                       }
+                       sr.TagFamilies = append(sr.TagFamilies, tf)
+               }
+       }
+       if len(sr.TagFamilies) != len(other.TagFamilies) {
+               logger.Panicf("tag family length mismatch: %d != %d", 
len(sr.TagFamilies), len(other.TagFamilies))
+       }
+       for i := range sr.TagFamilies {
+               if len(sr.TagFamilies[i].Tags) != 
len(other.TagFamilies[i].Tags) {
+                       logger.Panicf("tag length mismatch: %d != %d", 
len(sr.TagFamilies[i].Tags), len(other.TagFamilies[i].Tags))
+               }
+               for j := range sr.TagFamilies[i].Tags {
+                       sr.TagFamilies[i].Tags[j].Values = 
append(sr.TagFamilies[i].Tags[j].Values, 
other.TagFamilies[i].Tags[j].Values[other.idx])
+               }
+       }
+}
+
+// StreamResultHeap is a min-heap of StreamResult pointers.
+type StreamResultHeap struct {
+       data []*StreamResult
+       asc  bool
+}
+
+func (h StreamResultHeap) Len() int { return len(h.data) }
+func (h StreamResultHeap) Less(i, j int) bool {
+       if h.asc {
+               return h.data[i].Timestamps[h.data[i].idx] < 
h.data[j].Timestamps[h.data[j].idx]
+       }
+       return h.data[i].Timestamps[h.data[i].idx] > 
h.data[j].Timestamps[h.data[j].idx]
+}
+func (h StreamResultHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], 
h.data[i] }
+
+// Push pushes a StreamResult pointer to the heap.
+func (h *StreamResultHeap) Push(x interface{}) {
+       h.data = append(h.data, x.(*StreamResult))
+}
+
+// Pop pops a StreamResult pointer from the heap.
+func (h *StreamResultHeap) Pop() interface{} {
+       old := h.data
+       n := len(old)
+       x := old[n-1]
+       h.data = old[0 : n-1]
+       return x
+}
+
+// MergeStreamResults merges multiple StreamResult slices into a single 
StreamResult.
+func MergeStreamResults(results []*StreamResult, topN int, asc bool) 
*StreamResult {
+       h := &StreamResultHeap{asc: asc}
+       heap.Init(h)
+
+       for _, result := range results {
+               if result.Len() > 0 {
+                       result.idx = 0
+                       heap.Push(h, result)
+               }
+       }
+
+       mergedResult := NewStreamResult(topN, asc)
+
+       for h.Len() > 0 && mergedResult.Len() < topN {
+               sr := heap.Pop(h).(*StreamResult)
+               mergedResult.CopySingleFrom(sr)
+               sr.idx++
+               if sr.idx < sr.Len() {
+                       heap.Push(h, sr)
+               }
+       }
+
+       return mergedResult
 }
 
 // StreamQueryResult is the result of a stream query.
diff --git a/pkg/query/model/model_test.go b/pkg/query/model/model_test.go
new file mode 100644
index 00000000..dd1750be
--- /dev/null
+++ b/pkg/query/model/model_test.go
@@ -0,0 +1,947 @@
+package model
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+func TestStreamResult_CopyFrom(t *testing.T) {
+       tests := []struct {
+               sr       *StreamResult
+               other    *StreamResult
+               name     string
+               wantTS   []int64
+               wantTags []TagFamily
+               wantSIDs []common.SeriesID
+               topN     int
+               wantLen  int
+               asc      bool
+               wantRet  bool
+       }{
+               {
+                       name:     "both empty",
+                       sr:       NewStreamResult(3, true),
+                       other:    NewStreamResult(3, true),
+                       topN:     3,
+                       asc:      true,
+                       wantLen:  0,
+                       wantRet:  false,
+                       wantTS:   []int64{},
+                       wantTags: []TagFamily{},
+                       wantSIDs: []common.SeriesID{},
+               },
+               {
+                       name: "one with data, one empty",
+                       sr: &StreamResult{
+                               asc:        true,
+                               topN:       3,
+                               Timestamps: []int64{4, 2},
+                               ElementIDs: []uint64{100, 101},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value1"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value2"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value1"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value2"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{1, 2},
+                       },
+                       other:   NewStreamResult(3, true),
+                       topN:    3,
+                       asc:     true,
+                       wantLen: 2,
+                       wantRet: false,
+                       wantTS:  []int64{4, 2},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{1, 2},
+               },
+               {
+                       name: "one empty, one with data",
+                       sr:   NewStreamResult(3, true),
+                       other: &StreamResult{
+                               asc:        true,
+                               topN:       3,
+                               Timestamps: []int64{4, 2},
+                               ElementIDs: []uint64{100, 101},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value1"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value2"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value1"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value2"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{1, 2},
+                       },
+                       topN:    3,
+                       asc:     true,
+                       wantLen: 2,
+                       wantRet: false,
+                       wantTS:  []int64{4, 2},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{1, 2},
+               },
+               {
+                       name: "both have data, combined < topN",
+                       sr: &StreamResult{
+                               asc:        true,
+                               topN:       5,
+                               Timestamps: []int64{10, 30},
+                               ElementIDs: []uint64{1, 2},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value11"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value12"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value21"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value22"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{1, 2},
+                       },
+                       other: &StreamResult{
+                               asc:        true,
+                               topN:       5,
+                               Timestamps: []int64{20},
+                               ElementIDs: []uint64{3},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value13"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value23"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{3},
+                       },
+                       topN:    5,
+                       asc:     true,
+                       wantLen: 3,
+                       wantRet: false,
+                       wantTS:  []int64{10, 20, 30},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value11"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value13"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value12"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value21"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value23"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value22"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{1, 3, 2},
+               },
+               {
+                       name: "both have data, combined >= topN",
+                       sr: &StreamResult{
+                               asc:        false,
+                               topN:       3,
+                               Timestamps: []int64{30, 20, 10},
+                               ElementIDs: []uint64{5, 6, 7},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value11"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value12"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value13"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value21"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value22"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value23"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{1, 2, 3},
+                       },
+                       other: &StreamResult{
+                               asc:        false,
+                               topN:       3,
+                               Timestamps: []int64{50, 40},
+                               ElementIDs: []uint64{8, 9},
+                               TagFamilies: []TagFamily{
+                                       {
+                                               Name: "family1",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value14"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value15"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                                       {
+                                               Name: "family2",
+                                               Tags: []Tag{{
+                                                       Name: "tag1",
+                                                       Values: 
[]*modelv1.TagValue{
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value24"},
+                                                                       },
+                                                               },
+                                                               {
+                                                                       Value: 
&modelv1.TagValue_Str{
+                                                                               
Str: &modelv1.Str{Value: "value25"},
+                                                                       },
+                                                               },
+                                                       },
+                                               }},
+                                       },
+                               },
+                               SIDs: []common.SeriesID{4, 5},
+                       },
+                       topN:    3,
+                       asc:     false,
+                       wantLen: 3,
+                       wantRet: true,
+                       wantTS:  []int64{50, 40, 30},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value14"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value15"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value11"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value24"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value25"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value21"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{4, 5, 1},
+               },
+       }
+       tmp := &StreamResult{}
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := tt.sr.CopyFrom(tmp, tt.other)
+                       assert.Equal(t, tt.wantLen, tt.sr.Len(), "unexpected 
length")
+                       assert.Equal(t, tt.wantRet, got, "unexpected return 
value")
+                       assert.Equal(t, tt.wantTS, tt.sr.Timestamps, 
"unexpected timestamps")
+                       assert.Equal(t, tt.wantTags, tt.sr.TagFamilies, 
"unexpected tag families")
+                       assert.Equal(t, tt.wantSIDs, tt.sr.SIDs, "unexpected 
SIDs")
+               })
+       }
+}
+
+func TestMergeStreamResults(t *testing.T) {
+       tests := []struct {
+               name     string
+               results  []*StreamResult
+               wantTS   []int64
+               wantTags []TagFamily
+               wantSIDs []common.SeriesID
+               topN     int
+               asc      bool
+       }{
+               {
+                       name:     "all empty",
+                       results:  []*StreamResult{NewStreamResult(3, true), 
NewStreamResult(3, true)},
+                       topN:     3,
+                       asc:      true,
+                       wantTS:   []int64{},
+                       wantTags: []TagFamily{},
+                       wantSIDs: []common.SeriesID{},
+               },
+               {
+                       name: "one with data, one empty",
+                       results: []*StreamResult{
+                               {
+                                       asc:        true,
+                                       topN:       3,
+                                       Timestamps: []int64{2, 4},
+                                       ElementIDs: []uint64{100, 101},
+                                       TagFamilies: []TagFamily{
+                                               {
+                                                       Name: "family1",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value1"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value2"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                               {
+                                                       Name: "family2",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value1"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value2"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                       },
+                                       SIDs: []common.SeriesID{1, 2},
+                               },
+                               NewStreamResult(3, true),
+                       },
+                       topN:   3,
+                       asc:    true,
+                       wantTS: []int64{2, 4},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value1"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value2"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{1, 2},
+               },
+               {
+                       name: "both have data, combined < topN",
+                       results: []*StreamResult{
+                               {
+                                       asc:        true,
+                                       topN:       5,
+                                       Timestamps: []int64{10, 30},
+                                       ElementIDs: []uint64{1, 2},
+                                       TagFamilies: []TagFamily{
+                                               {
+                                                       Name: "family1",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value11"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value12"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                               {
+                                                       Name: "family2",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value21"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value22"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                       },
+                                       SIDs: []common.SeriesID{1, 2},
+                               },
+                               {
+                                       asc:        true,
+                                       topN:       5,
+                                       Timestamps: []int64{20},
+                                       ElementIDs: []uint64{3},
+                                       TagFamilies: []TagFamily{
+                                               {
+                                                       Name: "family1",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value13"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                               {
+                                                       Name: "family2",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value23"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                       },
+                                       SIDs: []common.SeriesID{3},
+                               },
+                       },
+                       topN:   5,
+                       asc:    true,
+                       wantTS: []int64{10, 20, 30},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value11"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value13"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value12"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value21"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value23"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value22"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{1, 3, 2},
+               },
+               {
+                       name: "both have data, combined >= topN",
+                       results: []*StreamResult{
+                               {
+                                       asc:        false,
+                                       topN:       3,
+                                       Timestamps: []int64{30, 20, 10},
+                                       ElementIDs: []uint64{5, 6, 7},
+                                       TagFamilies: []TagFamily{
+                                               {
+                                                       Name: "family1",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value11"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value12"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value13"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                               {
+                                                       Name: "family2",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value21"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value22"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value23"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                       },
+                                       SIDs: []common.SeriesID{1, 2, 3},
+                               },
+                               {
+                                       asc:        false,
+                                       topN:       3,
+                                       Timestamps: []int64{50, 40},
+                                       ElementIDs: []uint64{8, 9},
+                                       TagFamilies: []TagFamily{
+                                               {
+                                                       Name: "family1",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value14"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value15"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                               {
+                                                       Name: "family2",
+                                                       Tags: []Tag{{
+                                                               Name: "tag1",
+                                                               Values: 
[]*modelv1.TagValue{
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value24"},
+                                                                               
},
+                                                                       },
+                                                                       {
+                                                                               
Value: &modelv1.TagValue_Str{
+                                                                               
        Str: &modelv1.Str{Value: "value25"},
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       }},
+                                               },
+                                       },
+                                       SIDs: []common.SeriesID{4, 5},
+                               },
+                       },
+                       topN:   3,
+                       asc:    false,
+                       wantTS: []int64{50, 40, 30},
+                       wantTags: []TagFamily{
+                               {
+                                       Name: "family1",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value14"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value15"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value11"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                               {
+                                       Name: "family2",
+                                       Tags: []Tag{{
+                                               Name: "tag1",
+                                               Values: []*modelv1.TagValue{
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value24"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value25"},
+                                                               },
+                                                       },
+                                                       {
+                                                               Value: 
&modelv1.TagValue_Str{
+                                                                       Str: 
&modelv1.Str{Value: "value21"},
+                                                               },
+                                                       },
+                                               },
+                                       }},
+                               },
+                       },
+                       wantSIDs: []common.SeriesID{4, 5, 1},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := MergeStreamResults(tt.results, tt.topN, tt.asc)
+                       assert.Equal(t, tt.wantTS, got.Timestamps, "unexpected 
timestamps")
+                       assert.Equal(t, tt.wantTags, got.TagFamilies, 
"unexpected tag families")
+                       assert.Equal(t, tt.wantSIDs, got.SIDs, "unexpected 
SIDs")
+               })
+       }
+}


Reply via email to