This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 50af806f Add preload for the TopN query of index (#425)
50af806f is described below

commit 50af806fc7d6938cd31adb0dc403d5e5fb82fbe9
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Apr 1 08:53:17 2024 +0800

    Add preload for the TopN query of index (#425)
---
 CHANGES.md                            |   1 +
 banyand/internal/storage/index.go     |   4 +-
 banyand/internal/storage/storage.go   |   2 +-
 banyand/measure/query.go              |   6 +-
 banyand/stream/index.go               |   4 +-
 banyand/stream/iter_builder.go        |  18 ++----
 banyand/tsdb/series_seek_sort.go      |   6 +-
 banyand/tsdb/seriesdb.go              |   4 +-
 pkg/index/index.go                    |   7 ++-
 pkg/index/inverted/inverted.go        |  51 ++++++++++------
 pkg/index/inverted/sort.go            | 112 ++++++++++++++++++++++++++++++++++
 pkg/index/lsm/search.go               |   5 +-
 pkg/index/testcases/duration.go       |  49 ++++++++++++---
 pkg/iter/sort/sort.go                 |   8 +--
 test/stress/trace/Makefile            |  11 +++-
 test/stress/trace/trace_suite_test.go |   2 +-
 16 files changed, 228 insertions(+), 62 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index aa619a73..7dcb06f3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -21,6 +21,7 @@ Release Notes.
   - Flush memory data to disk.
   - Merge memory data and disk data.
 - Add HTTP services to TopNAggregation operations.
+- Add preload for the TopN query of index.
 
 ### Bugs
 
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index bf50a0c8..e85b2178 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -149,7 +149,7 @@ func convertIndexSeriesToSeriesList(indexSeries 
[]index.Series) (pbv1.SeriesList
        return seriesList, nil
 }
 
-func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter 
index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) {
+func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter 
index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error) {
        seriesList, err := s.searchPrimary(ctx, series)
        if err != nil {
                return nil, err
@@ -176,7 +176,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
*pbv1.Series, filter in
        fieldKey := index.FieldKey{
                IndexRuleID: order.Index.GetMetadata().Id,
        }
-       iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort)
+       iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, 
preloadSize)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index a2feb960..bce8c8b7 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -66,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T
 // IndexDB is the interface of index database.
 type IndexDB interface {
        Write(docs index.Documents) error
-       Search(ctx context.Context, series *pbv1.Series, filter index.Filter, 
order *pbv1.OrderBy) (pbv1.SeriesList, error)
+       Search(ctx context.Context, series *pbv1.Series, filter index.Filter, 
order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error)
 }
 
 // TSDB allows listing and getting shard details.
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index cf34a914..b7da2ad6 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -37,6 +37,10 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
+const (
+       preloadSize = 100
+)
+
 // Query allow to retrieve measure data points.
 type Query interface {
        LoadGroup(name string) (resourceSchema.Group, bool)
@@ -83,7 +87,7 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                }
        }()
 
-       sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
+       sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order, preloadSize)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 60f246f8..20c02de4 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -52,8 +52,8 @@ func newElementIndex(ctx context.Context, root string, 
flushTimeoutSeconds int64
        return ei, nil
 }
 
-func (e *elementIndex) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) {
-       iter, err := e.store.Iterator(fieldKey, termRange, order)
+func (e *elementIndex) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, order modelv1.Sort, preloadSize int) (index.FieldIterator, 
error) {
+       iter, err := e.store.Iterator(fieldKey, termRange, order, preloadSize)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go
index 8c7a9d7b..775a910e 100644
--- a/banyand/stream/iter_builder.go
+++ b/banyand/stream/iter_builder.go
@@ -20,8 +20,6 @@ package stream
 import (
        "time"
 
-       "github.com/pkg/errors"
-
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -32,10 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var (
-       errUnspecifiedIndexType = errors.New("Unspecified index type")
-       rangeOpts               = index.RangeOpts{}
-)
+var rangeOpts = index.RangeOpts{}
 
 type filterFn func(item item) bool
 
@@ -48,6 +43,7 @@ type iterBuilder struct {
        tagProjection       []pbv1.TagProjection
        seriesID            common.SeriesID
        order               modelv1.Sort
+       preloadSize         int
 }
 
 func newIterBuilder(tableWrappers []storage.TSTableWrapper[*tsTable], id 
common.SeriesID, sso pbv1.StreamSortOptions) *iterBuilder {
@@ -60,6 +56,7 @@ func newIterBuilder(tableWrappers 
[]storage.TSTableWrapper[*tsTable], id common.
                tagProjection:       sso.TagProjection,
                seriesID:            id,
                order:               sso.Order.Sort,
+               preloadSize:         sso.MaxElementSize,
        }
 }
 
@@ -94,14 +91,7 @@ func buildSeriesByIndex(s *iterBuilder) (series 
[]*searcherIterator, err error)
                        IndexRuleID: 
s.indexRuleForSorting.GetMetadata().GetId(),
                        Analyzer:    s.indexRuleForSorting.GetAnalyzer(),
                }
-               switch s.indexRuleForSorting.GetType() {
-               case databasev1.IndexRule_TYPE_TREE:
-                       inner, err = tw.Table().Index().Iterator(fieldKey, 
rangeOpts, s.order)
-               case databasev1.IndexRule_TYPE_INVERTED:
-                       inner, err = tw.Table().Index().Iterator(fieldKey, 
rangeOpts, s.order)
-               case databasev1.IndexRule_TYPE_UNSPECIFIED:
-                       return nil, 
errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", 
s.indexRuleForSorting)
-               }
+               inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, 
s.order, s.preloadSize)
                if err != nil {
                        return nil, err
                }
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 4ea8a2c3..604d5dff 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -18,6 +18,7 @@
 package tsdb
 
 import (
+       "math"
        "sort"
        "time"
 
@@ -85,9 +86,9 @@ func (s *seekerBuilder) buildSeriesByIndex() (series 
[]Iterator, err error) {
                }
                switch s.indexRuleForSorting.GetType() {
                case databasev1.IndexRule_TYPE_TREE:
-                       inner, err = b.lsmIndexReader().Iterator(fieldKey, 
rangeOpts, s.order)
+                       inner, err = b.lsmIndexReader().Iterator(fieldKey, 
rangeOpts, s.order, math.MaxInt)
                case databasev1.IndexRule_TYPE_INVERTED:
-                       inner, err = b.invertedIndexReader().Iterator(fieldKey, 
rangeOpts, s.order)
+                       inner, err = b.invertedIndexReader().Iterator(fieldKey, 
rangeOpts, s.order, math.MaxInt)
                case databasev1.IndexRule_TYPE_UNSPECIFIED:
                        return nil, 
errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", 
s.indexRuleForSorting)
                }
@@ -132,6 +133,7 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, 
error) {
                                },
                                termRange,
                                s.order,
+                               math.MaxInt,
                        )
                if err != nil {
                        return nil, err
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 803f7d14..94614959 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -497,9 +497,9 @@ func (s *seriesDB) Search(ctx context.Context, path Path, 
filter index.Filter, o
        var err error
        switch order.Index.Type {
        case databasev1.IndexRule_TYPE_TREE:
-               iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, order.Sort)
+               iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, 
order.Sort, math.MaxInt)
        case databasev1.IndexRule_TYPE_INVERTED:
-               iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, 
order.Sort)
+               iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, 
order.Sort, math.MaxInt)
        default:
                return nil, errUnspecifiedIndexType
        }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 1796d66b..e776fa1c 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -187,8 +187,9 @@ func (i *dummyIterator) Close() error {
 
 // PostingValue is the collection of a field's values.
 type PostingValue struct {
-       Value posting.List
-       Term  []byte
+       Value   posting.List
+       Term    []byte
+       TermRaw []byte
 }
 
 // Document represents a document in a index.
@@ -215,7 +216,7 @@ type Writer interface {
 
 // FieldIterable allows building a FieldIterator.
 type FieldIterable interface {
-       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort) 
(iter FieldIterator, err error)
+       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, 
preLoadSize int) (iter FieldIterator, err error)
 }
 
 // Searcher allows searching a field either by its key or by its key and term.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 9f4c6cc5..28962031 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -55,8 +55,9 @@ const (
 )
 
 var (
-       defaultUpper = convert.Uint64ToBytes(math.MaxUint64)
-       defaultLower = convert.Uint64ToBytes(0)
+       defaultUpper            = convert.Uint64ToBytes(math.MaxUint64)
+       defaultLower            = convert.Uint64ToBytes(0)
+       defaultRangePreloadSize = 1000
 )
 
 var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
@@ -159,7 +160,7 @@ func (s *store) Write(fields []index.Field, docID uint64) 
error {
        return nil
 }
 
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort) (iter index.FieldIterator, err error) {
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) {
        if termRange.Lower != nil &&
                termRange.Upper != nil &&
                bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
@@ -206,12 +207,15 @@ func (s *store) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOpts, ord
        if order == modelv1.Sort_SORT_DESC {
                sortedKey = "-" + sortedKey
        }
-       documentMatchIterator, err := reader.Search(context.Background(), 
bluge.NewTopNSearch(math.MaxInt64, query).SortBy([]string{sortedKey}))
-       if err != nil {
-               return nil, multierr.Combine(err, reader.Close())
+       result := &sortIterator{
+               query:            query,
+               reader:           reader,
+               sortedKey:        sortedKey,
+               fk:               fk,
+               shouldDecodeTerm: shouldDecodeTerm,
+               size:             preLoadSize,
        }
-       result := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm, reader)
-       return &result, nil
+       return result, nil
 }
 
 func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err 
error) {
@@ -241,7 +245,7 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm, 0, reader)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
@@ -274,7 +278,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, fk, false, 0, 
reader)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
@@ -286,7 +290,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
 }
 
 func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list 
posting.List, err error) {
-       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC)
+       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, 
defaultRangePreloadSize)
        if err != nil {
                return roaring.DummyPostingList, err
        }
@@ -423,14 +427,17 @@ type blugeMatchIterator struct {
        fieldKey         string
        shouldDecodeTerm bool
        closed           bool
+       num              int
+       skip             int
 }
 
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey 
string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey 
string, shouldDecodeTerm bool, skip int, closer io.Closer) blugeMatchIterator {
        return blugeMatchIterator{
                delegated:        delegated,
                fieldKey:         fieldKey,
                shouldDecodeTerm: shouldDecodeTerm,
                closer:           closer,
+               skip:             skip,
        }
 }
 
@@ -463,9 +470,13 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
                }
                return false
        }
+       bmi.num++
+       if bmi.num <= bmi.skip {
+               return true
+       }
        i := 0
        var docID uint64
-       var term []byte
+       var term, termRaw []byte
        bmi.err = match.VisitStoredFields(func(field string, value []byte) bool 
{
                if field == docIDField {
                        if len(value) == 8 {
@@ -479,6 +490,7 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
                if field == bmi.fieldKey {
                        v := y.Copy(value)
                        if bmi.shouldDecodeTerm {
+                               termRaw = v
                                term = index.UnmarshalTerm(v)
                        } else {
                                term = v
@@ -497,8 +509,9 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
        }
        if bmi.agg == nil {
                bmi.agg = &index.PostingValue{
-                       Term:  term,
-                       Value: roaring.NewPostingListWithInitialData(docID),
+                       Term:    term,
+                       TermRaw: termRaw,
+                       Value:   roaring.NewPostingListWithInitialData(docID),
                }
                return true
        }
@@ -508,8 +521,9 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
        }
        bmi.current = bmi.agg
        bmi.agg = &index.PostingValue{
-               Term:  term,
-               Value: roaring.NewPostingListWithInitialData(docID),
+               Term:    term,
+               TermRaw: termRaw,
+               Value:   roaring.NewPostingListWithInitialData(docID),
        }
        return false
 }
@@ -520,5 +534,8 @@ func (bmi *blugeMatchIterator) Val() *index.PostingValue {
 
 func (bmi *blugeMatchIterator) Close() error {
        bmi.closed = true
+       if bmi.closer == nil {
+               return bmi.err
+       }
        return errors.Join(bmi.err, bmi.closer.Close())
 }
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
new file mode 100644
index 00000000..60c434ae
--- /dev/null
+++ b/pkg/index/inverted/sort.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package inverted implements a inverted index repository.
+package inverted
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "io"
+       "math"
+
+       "github.com/blugelabs/bluge"
+
+       "github.com/apache/skywalking-banyandb/pkg/index"
+)
+
+type sortIterator struct {
+       query            bluge.Query
+       err              error
+       reader           *bluge.Reader
+       current          *blugeMatchIterator
+       sortedKey        string
+       fk               string
+       lastKey          []byte
+       currKey          []byte
+       size             int
+       skipped          int
+       shouldDecodeTerm bool
+}
+
+func (si *sortIterator) Next() bool {
+       if si.err != nil {
+               return false
+       }
+       if si.current == nil {
+               return si.loadCurrent()
+       }
+
+       if si.next() {
+               return true
+       }
+       si.current.Close()
+       return si.loadCurrent()
+}
+
+func (si *sortIterator) loadCurrent() bool {
+       size := si.size + si.skipped
+       if size < 0 {
+               // overflow
+               size = math.MaxInt64
+       }
+       topNSearch := bluge.NewTopNSearch(size, 
si.query).SortBy([]string{si.sortedKey})
+       if si.lastKey != nil {
+               topNSearch = topNSearch.After([][]byte{si.lastKey})
+       }
+
+       documentMatchIterator, err := si.reader.Search(context.Background(), 
topNSearch)
+       if err != nil {
+               si.err = err
+               return false
+       }
+
+       iter := newBlugeMatchIterator(documentMatchIterator, si.fk, 
si.shouldDecodeTerm, si.skipped, nil)
+       si.current = &iter
+       if si.next() {
+               return true
+       }
+       si.err = io.EOF
+       return false
+}
+
+func (si *sortIterator) next() bool {
+       if si.current.Next() {
+               currKey := si.current.Val().TermRaw
+               if si.currKey != nil && !bytes.Equal(currKey, si.currKey) {
+                       si.lastKey = si.currKey
+                       si.skipped = 0
+               }
+               si.currKey = currKey
+               si.skipped += si.current.Val().Value.Len()
+               return true
+       }
+       return false
+}
+
+func (si *sortIterator) Val() *index.PostingValue {
+       return si.current.Val()
+}
+
+func (si *sortIterator) Close() error {
+       if errors.Is(si.err, io.EOF) {
+               si.err = nil
+               return errors.Join(si.current.Close(), si.reader.Close())
+       }
+       return errors.Join(si.err, si.current.Close(), si.reader.Close())
+}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index daa21677..eba435cb 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -19,6 +19,7 @@ package lsm
 
 import (
        "bytes"
+       "math"
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
@@ -50,7 +51,7 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
 }
 
 func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list 
posting.List, err error) {
-       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC)
+       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, 
math.MaxInt)
        if err != nil {
                return roaring.DummyPostingList, err
        }
@@ -62,7 +63,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        return
 }
 
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort) (index.FieldIterator, error) {
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort, _ int) (index.FieldIterator, error) {
        if !s.closer.AddRunning() {
                return nil, errors.New("lsm index store is closed")
        }
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 2a61322b..ebbc9d70 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -19,6 +19,7 @@
 package testcases
 
 import (
+       "fmt"
        "sort"
        "testing"
 
@@ -57,8 +58,6 @@ type result struct {
 
 // RunDuration executes duration related cases.
 func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
-       tester := assert.New(t)
-       is := require.New(t)
        tests := []struct {
                name string
                want []int
@@ -262,9 +261,34 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                        },
                },
        }
-       for _, tt := range tests {
+       preLoadSizes := []int{7, 20, 50}
+       allTests := make([]struct {
+               name        string
+               want        []int
+               args        args
+               preloadSize int
+       }, 0, len(tests)*len(preLoadSizes))
+
+       for _, size := range preLoadSizes {
+               for _, t := range tests {
+                       allTests = append(allTests, struct {
+                               name        string
+                               want        []int
+                               args        args
+                               preloadSize int
+                       }{
+                               name:        t.name + " preLoadSize " + 
fmt.Sprint(size),
+                               want:        t.want,
+                               preloadSize: size,
+                               args:        t.args,
+                       })
+               }
+       }
+       for _, tt := range allTests {
                t.Run(tt.name, func(t *testing.T) {
-                       iter, err := store.Iterator(tt.args.fieldKey, 
tt.args.termRange, tt.args.orderType)
+                       tester := assert.New(t)
+                       is := require.New(t)
+                       iter, err := store.Iterator(tt.args.fieldKey, 
tt.args.termRange, tt.args.orderType, tt.preloadSize)
                        is.NoError(err)
                        if iter == nil {
                                tester.Empty(tt.want)
@@ -278,11 +302,20 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                        }()
                        is.NotNil(iter)
                        got := make([]result, 0)
+                       var currResult result
                        for iter.Next() {
-                               got = append(got, result{
-                                       key:   
int(convert.BytesToInt64(iter.Val().Term)),
-                                       items: toArray(iter.Val().Value),
-                               })
+                               key := 
int(convert.BytesToInt64(iter.Val().Term))
+                               if currResult.key != key {
+                                       if currResult.key != 0 {
+                                               got = append(got, currResult)
+                                               currResult = result{}
+                                       }
+                                       currResult.key = key
+                               }
+                               currResult.items = append(currResult.items, 
toArray(iter.Val().Value)...)
+                       }
+                       if len(currResult.items) > 0 {
+                               got = append(got, currResult)
                        }
                        for i := 0; i < 10; i++ {
                                is.False(iter.Next())
diff --git a/pkg/iter/sort/sort.go b/pkg/iter/sort/sort.go
index b0d67eb5..f8aa1f0c 100644
--- a/pkg/iter/sort/sort.go
+++ b/pkg/iter/sort/sort.go
@@ -21,7 +21,8 @@ package sort
 import (
        "bytes"
        "container/heap"
-       "fmt"
+
+       "go.uber.org/multierr"
 )
 
 // Comparable is an interface that allows sorting of items.
@@ -124,10 +125,7 @@ func (it *itemIter[T]) Val() T {
 func (it *itemIter[T]) Close() error {
        var err error
        for _, iter := range it.iters {
-               if e := iter.Close(); e != nil {
-                       fmt.Println("Error closing iterator:", e)
-                       err = e
-               }
+               err = multierr.Append(err, iter.Close())
        }
        return err
 }
diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile
index f331107e..27c9cbe6 100644
--- a/test/stress/trace/Makefile
+++ b/test/stress/trace/Makefile
@@ -15,6 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+mk_path  := $(abspath $(lastword $(MAKEFILE_LIST)))
+mk_dir   := $(dir $(mk_path))
+root_dir := $(abspath $(mk_dir)/../../..)
+tool_bin := $(root_dir)/bin
+
+include $(root_dir)/scripts/build/version.mk
+include $(root_dir)/scripts/build/ginkgo.mk
 
 NAME := stress
 
@@ -59,6 +66,6 @@ rm_traffic:
        curl -XDELETE 'http://localhost:12800/mock-data/segments/tasks'
 
 .PHONY: test_query
-test_query:
-       go test -v -timeout 1h -run TestQuery ./...
+test_query: $(GINKGO)
+       $(GINKGO) -v -timeout 1h TestQuery ./...
        
\ No newline at end of file
diff --git a/test/stress/trace/trace_suite_test.go 
b/test/stress/trace/trace_suite_test.go
index c450a8db..a749a236 100644
--- a/test/stress/trace/trace_suite_test.go
+++ b/test/stress/trace/trace_suite_test.go
@@ -31,7 +31,7 @@ import (
 )
 
 func TestIntegrationLoad(t *testing.T) {
-       t.Skip("Skip the stress trace test")
+       t.Skip("Skip the stress test")
        RegisterFailHandler(Fail)
        RunSpecs(t, "Stress Trace Suite")
 }

Reply via email to