This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4588b55c Merge the query process of series index (#505)
4588b55c is described below

commit 4588b55cdc005521ca32d504920925c0cb9d62d1
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Aug 9 06:54:43 2024 +0800

    Merge the query process of series index (#505)
    
    * Merge queries of primary index and secondary index
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/index.go                  | 125 ++++++++-------------
 banyand/internal/storage/index_test.go             |   2 +-
 banyand/internal/storage/storage.go                |   3 +-
 pkg/index/index.go                                 |  26 +++--
 pkg/index/inverted/inverted.go                     |  53 +++++----
 pkg/index/inverted/inverted_series.go              |  57 +++++++---
 pkg/index/inverted/inverted_series_test.go         |  16 ++-
 pkg/index/inverted/query.go                        | 122 +++++++++++++-------
 pkg/index/inverted/sort.go                         |  13 ++-
 pkg/index/testcases/duration.go                    |   2 +-
 .../measure/measure_plan_indexscan_local.go        |   3 +-
 pkg/query/model/model.go                           |   3 +-
 12 files changed, 232 insertions(+), 193 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index d5950c15..d3b059ae 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "maps"
        "path"
 
        "github.com/pkg/errors"
@@ -27,7 +28,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
-       "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
@@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB {
 }
 
 func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) 
(pbv1.SeriesList, error) {
-       sl, _, err := s.index.searchPrimary(ctx, series, nil)
+       sl, _, err := s.index.filter(ctx, series, nil, nil)
        return sl, err
 }
 
@@ -70,7 +70,9 @@ func (s *seriesIndex) Write(docs index.Documents) error {
 
 var rangeOpts = index.RangeOpts{}
 
-func (s *seriesIndex) searchPrimary(ctx context.Context, series 
[]*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields 
FieldResultList, err error) {
+func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
+       projection []index.FieldKey, secondaryQuery index.Query,
+) (sl pbv1.SeriesList, fields FieldResultList, err error) {
        seriesMatchers := make([]index.SeriesMatcher, len(series))
        for i := range series {
                seriesMatchers[i], err = 
convertEntityValuesToSeriesMatcher(series[i])
@@ -78,10 +80,14 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series []*pbv1.Series,
                        return nil, nil, err
                }
        }
+       indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
+       if err != nil {
+               return nil, nil, err
+       }
        tracer := query.GetTracer(ctx)
        if tracer != nil {
-               span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
-               span.Tagf("matchers", "%v", seriesMatchers)
+               span, _ := tracer.StartSpan(ctx, "seriesIndex.search")
+               span.Tagf("query", "%s", indexQuery.String())
                defer func() {
                        span.Tagf("matched", "%d", len(sl))
                        if len(fields) > 0 {
@@ -93,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series []*pbv1.Series,
                        span.Stop()
                }()
        }
-       ss, err := s.store.Search(ctx, seriesMatchers, projection)
+       ss, err := s.store.Search(ctx, projection, indexQuery)
        if err != nil {
                return nil, nil, err
        }
@@ -191,44 +197,19 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
                        span.Stop()
                }()
        }
-       seriesList, fieldResultList, err := s.searchPrimary(ctx, series, 
opts.Projection)
-       if err != nil {
-               return nil, nil, err
-       }
 
-       pl := seriesList.ToList()
-       if opts.Query != nil {
-               var plFilter posting.List
-               func() {
-                       if tracer != nil {
-                               span, _ := tracer.StartSpan(ctx, "filter")
-                               span.Tag("exp", opts.Query.String())
-                               defer func() {
-                                       if err != nil {
-                                               span.Error(err)
-                                       } else {
-                                               span.Tagf("matched", "%d", 
plFilter.Len())
-                                               span.Tagf("total", "%d", 
pl.Len())
-                                       }
-                                       span.Stop()
-                               }()
-                       }
-                       if plFilter, err = s.store.Execute(ctx, opts.Query); 
err != nil {
-                               return
-                       }
-                       if plFilter == nil {
-                               return
-                       }
-                       err = pl.Intersect(plFilter)
-               }()
+       if opts.Order == nil || opts.Order.Index == nil {
+               var seriesList pbv1.SeriesList
+               var fieldResultList FieldResultList
+               if opts.Query != nil {
+                       seriesList, fieldResultList, err = s.filter(ctx, 
series, opts.Projection, opts.Query)
+               } else {
+                       seriesList, fieldResultList, err = s.filter(ctx, 
series, opts.Projection, nil)
+               }
                if err != nil {
                        return nil, nil, err
                }
-       }
-
-       if opts.Order == nil || opts.Order.Index == nil {
-               sl, frl = filterSeriesList(seriesList, fieldResultList, pl)
-               return sl, frl, nil
+               return seriesList, fieldResultList, nil
        }
 
        fieldKey := index.FieldKey{
@@ -245,8 +226,19 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
                        span.Stop()
                }()
        }
+       seriesMatchers := make([]index.SeriesMatcher, len(series))
+       for i := range series {
+               seriesMatchers[i], err = 
convertEntityValuesToSeriesMatcher(series[i])
+               if err != nil {
+                       return nil, nil, err
+               }
+       }
+       query, err := s.store.BuildQuery(seriesMatchers, opts.Query)
+       if err != nil {
+               return nil, nil, err
+       }
        iter, err := s.store.Iterator(fieldKey, rangeOpts,
-               opts.Order.Sort, opts.PreloadSize)
+               opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
        if err != nil {
                return nil, nil, err
        }
@@ -254,56 +246,29 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
                err = multierr.Append(err, iter.Close())
        }()
 
-       var sortedSeriesList pbv1.SeriesList
-       var sortedFieldResultList FieldResultList
        var r int
+       result := make([]index.SeriesDocument, 0, 10)
        for iter.Next() {
                r++
-               docID := iter.Val().DocID
-               if !pl.Contains(docID) {
-                       continue
-               }
-               sortedSeriesList, sortedFieldResultList = appendSeriesList(
-                       sortedSeriesList, seriesList,
-                       sortedFieldResultList, fieldResultList,
-                       common.SeriesID(docID))
-               if err != nil {
-                       return nil, nil, err
-               }
+               val := iter.Val()
+               var doc index.SeriesDocument
+               doc.Fields = maps.Clone(val.Values)
+               doc.Key.ID = common.SeriesID(val.DocID)
+               doc.Key.EntityValues = val.EntityValues
+               result = append(result, doc)
+       }
+       sortedSeriesList, sortedFieldResultList, err := 
convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
+       if err != nil {
+               return nil, nil, errors.WithMessagef(err, "failed to convert 
index series to series list, matchers: %v, matched: %d", seriesMatchers, 
len(result))
        }
        if span != nil {
+               span.Tagf("query", "%s", iter.Query().String())
                span.Tagf("rounds", "%d", r)
                span.Tagf("size", "%d", len(sortedSeriesList))
        }
        return sortedSeriesList, sortedFieldResultList, err
 }
 
-func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList 
FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) {
-       for i := 0; i < len(seriesList); i++ {
-               if !filter.Contains(uint64(seriesList[i].ID)) {
-                       seriesList = append(seriesList[:i], seriesList[i+1:]...)
-                       if fieldResultList != nil {
-                               fieldResultList = append(fieldResultList[:i], 
fieldResultList[i+1:]...)
-                       }
-                       i--
-               }
-       }
-       return seriesList, fieldResultList
-}
-
-func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL 
FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) {
-       for i := 0; i < len(src); i++ {
-               if target == src[i].ID {
-                       dest = append(dest, src[i])
-                       if srcFRL != nil {
-                               destFRL = append(destFRL, srcFRL[i])
-                       }
-                       break
-               }
-       }
-       return dest, destFRL
-}
-
 func (s *seriesIndex) Close() error {
        return s.store.Close()
 }
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index d73886a3..2b196ec7 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -157,7 +157,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
                                seriesQuery.EntityValues = tt.entityValues[i]
                                seriesQueries = append(seriesQueries, 
&seriesQuery)
                        }
-                       sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
+                       sl, _, err := si.filter(ctx, seriesQueries, nil, nil)
                        require.NoError(t, err)
                        require.Equal(t, len(tt.entityValues), len(sl))
                        assert.Equal(t, tt.subject, sl[0].Subject)
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index fb7e0af6..47b82639 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -34,7 +34,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T
 
 // IndexSearchOpts is the options for searching index.
 type IndexSearchOpts struct {
-       Query       *inverted.Query
+       Query       index.Query
        Order       *model.OrderBy
        Projection  []index.FieldKey
        PreloadSize int
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 8234f496..a7da177b 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -24,8 +24,6 @@ import (
        "fmt"
        "io"
 
-       "github.com/blugelabs/bluge"
-
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int {
 
 // DocumentResult represents a document in an index.
 type DocumentResult struct {
-       Values      map[string][]byte
-       SortedValue []byte
-       SeriesID    common.SeriesID
-       DocID       uint64
-       Timestamp   int64
+       EntityValues []byte
+       Values       map[string][]byte
+       SortedValue  []byte
+       SeriesID     common.SeriesID
+       DocID        uint64
+       Timestamp    int64
 }
 
 // SortedField returns the value of the sorted field.
@@ -114,6 +113,7 @@ type FieldIterator[T sort.Comparable] interface {
        Next() bool
        Val() T
        Close() error
+       Query() Query
 }
 
 // DummyFieldIterator never iterates.
@@ -133,6 +133,10 @@ func (i *dummyIterator) Close() error {
        return nil
 }
 
+func (i *dummyIterator) Query() Query {
+       return nil
+}
+
 // Document represents a document in an index.
 type Document struct {
        Fields       []Field
@@ -156,7 +160,8 @@ type Writer interface {
 
 // FieldIterable allows building a FieldIterator.
 type FieldIterable interface {
-       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, 
preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
+       BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) 
(Query, error)
+       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, 
preLoadSize int, query Query, fieldKeys []FieldKey) (iter 
FieldIterator[*DocumentResult], err error)
        Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, 
timeRange *timestamp.TimeRange, preLoadSize int) 
(FieldIterator[*DocumentResult], error)
 }
 
@@ -171,9 +176,7 @@ type Searcher interface {
 
 // Query is an abstract of an index query.
 type Query interface {
-       bluge.Query
        fmt.Stringer
-       Query() bluge.Query
 }
 
 // Store is an abstract of an index repository.
@@ -204,8 +207,7 @@ type SeriesDocument struct {
 type SeriesStore interface {
        Store
        // Search returns a list of series that match the given matchers.
-       Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, 
error)
-       Execute(context.Context, Query) (posting.List, error)
+       Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
 }
 
 // SeriesMatcherType represents the type of series matcher.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 81af6e20..866aacc7 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -174,8 +174,8 @@ func (s *store) Close() error {
        return s.writer.Close()
 }
 
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
-       order modelv1.Sort, preLoadSize int,
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort,
+       preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey,
 ) (iter index.FieldIterator[*index.DocumentResult], err error) {
        if termRange.Lower != nil &&
                termRange.Upper != nil &&
@@ -191,7 +191,8 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts,
                return nil, err
        }
        fk := fieldKey.Marshal()
-       query := bluge.NewBooleanQuery()
+       rangeQuery := bluge.NewBooleanQuery()
+       rangeNode := newMustNode()
        addRange := func(query *bluge.BooleanQuery, termRange index.RangeOpts) 
*bluge.BooleanQuery {
                if termRange.Upper == nil {
                        termRange.Upper = defaultUpper
@@ -206,25 +207,39 @@ func (s *store) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOpts,
                        termRange.IncludesUpper,
                ).
                        SetField(fk))
+               
rangeNode.Append(newTermRangeInclusiveNode(string(termRange.Lower), 
string(termRange.Upper), termRange.IncludesLower, termRange.IncludesUpper, nil))
                return query
        }
 
        if fieldKey.HasSeriesID() {
-               query = 
query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
+               rangeQuery = 
rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
                        SetField(seriesIDField))
+               
rangeNode.Append(newTermNode(string(fieldKey.SeriesID.Marshal()), nil))
                if termRange.Lower != nil || termRange.Upper != nil {
-                       query = addRange(query, termRange)
+                       rangeQuery = addRange(rangeQuery, termRange)
                }
        } else {
-               query = addRange(query, termRange)
+               rangeQuery = addRange(rangeQuery, termRange)
        }
 
        sortedKey := fk
        if order == modelv1.Sort_SORT_DESC {
                sortedKey = "-" + sortedKey
        }
+       query := bluge.NewBooleanQuery().AddMust(rangeQuery)
+       node := newMustNode()
+       node.Append(rangeNode)
+       if indexQuery != nil && indexQuery.(*queryNode).query != nil {
+               query.AddMust(indexQuery.(*queryNode).query)
+               node.Append(indexQuery.(*queryNode).node)
+       }
+       fields := make([]string, 0, len(fieldKeys))
+       for i := range fieldKeys {
+               fields = append(fields, fieldKeys[i].Marshal())
+       }
        result := &sortIterator{
-               query:     query,
+               query:     &queryNode{query, node},
+               fields:    fields,
                reader:    reader,
                sortedKey: sortedKey,
                size:      preLoadSize,
@@ -298,7 +313,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
 }
 
 func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list 
posting.List, err error) {
-       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, 
defaultRangePreloadSize)
+       iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, 
defaultRangePreloadSize, nil, nil)
        if err != nil {
                return roaring.DummyPostingList, err
        }
@@ -310,26 +325,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        return
 }
 
-func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, 
error) {
-       reader, err := s.writer.Reader()
-       if err != nil {
-               return nil, err
-       }
-       documentMatchIterator, err := reader.Search(ctx, 
bluge.NewAllMatches(query.Query()))
-       if err != nil {
-               return nil, err
-       }
-       iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
-       defer func() {
-               err = multierr.Append(err, iter.Close())
-       }()
-       list := roaring.NewPostingList()
-       for iter.Next() {
-               list.Insert(iter.Val().DocID)
-       }
-       return list, err
-}
-
 func (s *store) SizeOnDisk() int64 {
        _, bytes := s.writer.DirectoryStats()
        return int64(bytes)
@@ -380,6 +375,8 @@ func (bmi *blugeMatchIterator) Next() bool {
        }
        err := match.VisitStoredFields(func(field string, value []byte) bool {
                switch field {
+               case entityField:
+                       bmi.current.EntityValues = value
                case docIDField:
                        bmi.current.DocID = convert.BytesToUint64(value)
                case seriesIDField:
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index 02852397..7c3e31f2 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -33,48 +33,71 @@ import (
 
 var emptySeries = make([]index.SeriesDocument, 0)
 
-// Search implements index.SeriesStore.
-func (s *store) Search(ctx context.Context, seriesMatchers 
[]index.SeriesMatcher, projection []index.FieldKey) ([]index.SeriesDocument, 
error) {
+// BuildQuery implements index.SeriesStore.
+func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, 
secondaryQuery index.Query) (index.Query, error) {
        if len(seriesMatchers) == 0 {
-               return emptySeries, nil
-       }
-       reader, err := s.writer.Reader()
-       if err != nil {
-               return nil, err
+               return secondaryQuery, nil
        }
-       defer func() {
-               _ = reader.Close()
-       }()
+
        qs := make([]bluge.Query, len(seriesMatchers))
+       primaryNode := newShouldNode()
        for i := range seriesMatchers {
                switch seriesMatchers[i].Type {
                case index.SeriesMatcherTypeExact:
-                       q := 
bluge.NewTermQuery(convert.BytesToString(seriesMatchers[i].Match))
+                       match := convert.BytesToString(seriesMatchers[i].Match)
+                       q := bluge.NewTermQuery(match)
                        q.SetField(entityField)
                        qs[i] = q
+                       primaryNode.Append(newTermNode(match, nil))
                case index.SeriesMatcherTypePrefix:
-                       q := 
bluge.NewPrefixQuery(convert.BytesToString(seriesMatchers[i].Match))
+                       match := convert.BytesToString(seriesMatchers[i].Match)
+                       q := bluge.NewPrefixQuery(match)
                        q.SetField(entityField)
                        qs[i] = q
+                       primaryNode.Append(newPrefixNode(match, nil))
                case index.SeriesMatcherTypeWildcard:
-                       q := 
bluge.NewWildcardQuery(convert.BytesToString(seriesMatchers[i].Match))
+                       match := convert.BytesToString(seriesMatchers[i].Match)
+                       q := bluge.NewWildcardQuery(match)
                        q.SetField(entityField)
                        qs[i] = q
+                       primaryNode.Append(newWildcardNode(match, nil))
                default:
                        return nil, errors.Errorf("unsupported series matcher 
type: %v", seriesMatchers[i].Type)
                }
        }
-       var query bluge.Query
+       var primaryQuery bluge.Query
        if len(qs) > 1 {
                bq := bluge.NewBooleanQuery()
                bq.AddShould(qs...)
                bq.SetMinShould(1)
-               query = bq
+               primaryQuery = bq
        } else {
-               query = qs[0]
+               primaryQuery = qs[0]
+       }
+
+       query := bluge.NewBooleanQuery().AddMust(primaryQuery)
+       node := newMustNode()
+       node.Append(primaryNode)
+       if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil {
+               query.AddMust(secondaryQuery.(*queryNode).query)
+               node.Append(secondaryQuery.(*queryNode).node)
        }
+       return &queryNode{query, node}, nil
+}
+
+// Search implements index.SeriesStore.
+func (s *store) Search(ctx context.Context,
+       projection []index.FieldKey, query index.Query,
+) ([]index.SeriesDocument, error) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return nil, err
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
 
-       dmi, err := reader.Search(ctx, bluge.NewAllMatches(query))
+       dmi, err := reader.Search(ctx, 
bluge.NewAllMatches(query.(*queryNode).query))
        if err != nil {
                return nil, err
        }
diff --git a/pkg/index/inverted/inverted_series_test.go 
b/pkg/index/inverted/inverted_series_test.go
index 03a5e181..647189d9 100644
--- a/pkg/index/inverted/inverted_series_test.go
+++ b/pkg/index/inverted/inverted_series_test.go
@@ -191,7 +191,9 @@ func TestStore_Search(t *testing.T) {
                        name += string(term) + "-"
                }
                t.Run(name, func(t *testing.T) {
-                       got, err := s.Search(context.Background(), matchers, 
tt.projection)
+                       query, err := s.BuildQuery(matchers, nil)
+                       require.NoError(t, err)
+                       got, err := s.Search(context.Background(), 
tt.projection, query)
                        require.NoError(t, err)
                        assert.Equal(t, tt.want, got)
                })
@@ -273,12 +275,14 @@ func TestStore_SearchWildcard(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(string(tt.wildcard), func(t *testing.T) {
-                       got, err := s.Search(context.Background(), 
[]index.SeriesMatcher{
+                       query, err := s.BuildQuery([]index.SeriesMatcher{
                                {
                                        Type:  index.SeriesMatcherTypeWildcard,
                                        Match: tt.wildcard,
                                },
-                       }, tt.projection)
+                       }, nil)
+                       require.NoError(t, err)
+                       got, err := s.Search(context.Background(), 
tt.projection, query)
                        require.NoError(t, err)
                        assert.ElementsMatch(t, tt.want, got)
                })
@@ -338,12 +342,14 @@ func TestStore_SearchPrefix(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(string(tt.prefix), func(t *testing.T) {
-                       got, err := s.Search(context.Background(), 
[]index.SeriesMatcher{
+                       query, err := s.BuildQuery([]index.SeriesMatcher{
                                {
                                        Type:  index.SeriesMatcherTypePrefix,
                                        Match: tt.prefix,
                                },
-                       }, tt.projection)
+                       }, nil)
+                       require.NoError(t, err)
+                       got, err := s.Search(context.Background(), 
tt.projection, query)
                        require.NoError(t, err)
                        assert.ElementsMatch(t, tt.want, got)
                })
diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go
index e3f90fda..b34fd719 100644
--- a/pkg/index/inverted/query.go
+++ b/pkg/index/inverted/query.go
@@ -24,12 +24,12 @@ import (
        "strings"
 
        "github.com/blugelabs/bluge"
-       "github.com/blugelabs/bluge/search"
        "github.com/pkg/errors"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
@@ -49,30 +49,22 @@ type GlobalIndexError struct {
 
 func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
 
-// Query is a wrapper for bluge.Query.
-type Query struct {
+var _ index.Query = (*queryNode)(nil)
+
+// queryNode is a wrapper for bluge.Query.
+type queryNode struct {
        query bluge.Query
        node
 }
 
-// Searcher implements index.Query.
-func (q *Query) Searcher(i search.Reader, options search.SearcherOptions) 
(search.Searcher, error) {
-       return q.query.Searcher(i, options)
-}
-
-func (q *Query) String() string {
+func (q *queryNode) String() string {
        return q.node.String()
 }
 
-// Query implements index.Query.
-func (q *Query) Query() bluge.Query {
-       return q.query
-}
-
 // BuildLocalQuery returns blugeQuery for local indices.
 func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, 
entityDict map[string]int,
        entity []*modelv1.TagValue,
-) (*Query, [][]*modelv1.TagValue, bool, error) {
+) (index.Query, [][]*modelv1.TagValue, bool, error) {
        if criteria == nil {
                return nil, [][]*modelv1.TagValue{entity}, false, nil
        }
@@ -117,7 +109,7 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema 
logical.Schema, entityDi
                        return nil, entities, false, nil
                }
                if leftIsMatchAllQuery && rightIsMatchAllQuery {
-                       return &Query{
+                       return &queryNode{
                                query: bluge.NewMatchAllQuery(),
                                node:  newMatchAllNode(),
                        }, entities, true, nil
@@ -126,17 +118,17 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema 
logical.Schema, entityDi
                case modelv1.LogicalExpression_LOGICAL_OP_AND:
                        query, node := bluge.NewBooleanQuery(), newMustNode()
                        if left != nil {
-                               query.AddMust(left.query)
-                               node.Append(left.node)
+                               query.AddMust(left.(*queryNode).query)
+                               node.Append(left.(*queryNode).node)
                        }
                        if right != nil {
-                               query.AddMust(right.query)
-                               node.Append(right.node)
+                               query.AddMust(right.(*queryNode).query)
+                               node.Append(right.(*queryNode).node)
                        }
-                       return &Query{query, node}, entities, false, nil
+                       return &queryNode{query, node}, entities, false, nil
                case modelv1.LogicalExpression_LOGICAL_OP_OR:
                        if leftIsMatchAllQuery || rightIsMatchAllQuery {
-                               return &Query{
+                               return &queryNode{
                                        query: bluge.NewMatchAllQuery(),
                                        node:  newMatchAllNode(),
                                }, entities, true, nil
@@ -144,14 +136,14 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema 
logical.Schema, entityDi
                        query, node := bluge.NewBooleanQuery(), newShouldNode()
                        query.SetMinShould(1)
                        if left != nil {
-                               query.AddShould(left.query)
-                               node.Append(left.node)
+                               query.AddShould(left.(*queryNode).query)
+                               node.Append(left.(*queryNode).node)
                        }
                        if right != nil {
-                               query.AddShould(right.query)
-                               node.Append(right.node)
+                               query.AddShould(right.(*queryNode).query)
+                               node.Append(right.(*queryNode).node)
                        }
-                       return &Query{query, node}, entities, false, nil
+                       return &queryNode{query, node}, entities, false, nil
                }
        }
        return nil, nil, false, logical.ErrInvalidCriteriaType
@@ -159,11 +151,11 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema 
logical.Schema, entityDi
 
 func parseConditionToQuery(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule,
        expr logical.LiteralExpr, entity []*modelv1.TagValue,
-) (*Query, [][]*modelv1.TagValue, bool, error) {
+) (*queryNode, [][]*modelv1.TagValue, bool, error) {
        field := string(convert.Uint32ToBytes(indexRule.Metadata.Id))
        b := expr.Bytes()
        if len(b) < 1 {
-               return &Query{
+               return &queryNode{
                        query: bluge.NewMatchAllQuery(),
                        node:  newMatchAllNode(),
                }, [][]*modelv1.TagValue{entity}, true, nil
@@ -173,32 +165,32 @@ func parseConditionToQuery(cond *modelv1.Condition, 
indexRule *databasev1.IndexR
        case modelv1.Condition_BINARY_OP_GT:
                query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false, 
false).SetField(field)
                node := newTermRangeInclusiveNode(str, maxInf, false, false, 
indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_GE:
                query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true, 
false).SetField(field)
                node := newTermRangeInclusiveNode(str, maxInf, true, false, 
indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_LT:
                query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, 
false).SetField(field)
                node := newTermRangeInclusiveNode(minInf, str, false, false, 
indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_LE:
                query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, 
true).SetField(field)
                node := newTermRangeInclusiveNode(minInf, str, false, true, 
indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_EQ:
                query := bluge.NewTermQuery(term).SetField(field)
                node := newTermNode(str, indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_MATCH:
                query := 
bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer])
                node := newMatchNode(str, indexRule)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_NE:
                query, node := bluge.NewBooleanQuery(), newMustNotNode()
                query.AddMustNot(bluge.NewTermQuery(term).SetField(field))
                node.SetSubNode(newTermNode(str, indexRule))
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_HAVING:
                bb, elements := expr.Bytes(), expr.Elements()
                query, node := bluge.NewBooleanQuery(), newMustNode()
@@ -208,7 +200,7 @@ func parseConditionToQuery(cond *modelv1.Condition, 
indexRule *databasev1.IndexR
                for _, e := range elements {
                        node.Append(newTermNode(e, indexRule))
                }
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                bb, elements := expr.Bytes(), expr.Elements()
                subQuery, subNode := bluge.NewBooleanQuery(), newMustNode()
@@ -221,7 +213,7 @@ func parseConditionToQuery(cond *modelv1.Condition, 
indexRule *databasev1.IndexR
                query, node := bluge.NewBooleanQuery(), newMustNotNode()
                query.AddMustNot(subQuery)
                node.SetSubNode(node)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_IN:
                bb, elements := expr.Bytes(), expr.Elements()
                query, node := bluge.NewBooleanQuery(), newShouldNode()
@@ -232,7 +224,7 @@ func parseConditionToQuery(cond *modelv1.Condition, 
indexRule *databasev1.IndexR
                for _, e := range elements {
                        node.Append(newTermNode(e, indexRule))
                }
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
                bb, elements := expr.Bytes(), expr.Elements()
                subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode()
@@ -246,7 +238,7 @@ func parseConditionToQuery(cond *modelv1.Condition, 
indexRule *databasev1.IndexR
                query, node := bluge.NewBooleanQuery(), newMustNotNode()
                query.AddMustNot(subQuery)
                node.SetSubNode(subNode)
-               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+               return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
        }
        return nil, nil, false, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses 
%v", cond)
 }
@@ -429,3 +421,53 @@ func (m *matchNode) MarshalJSON() ([]byte, error) {
 func (m *matchNode) String() string {
        return convert.JSONToString(m)
 }
+
+type prefixNode struct {
+       indexRule *databasev1.IndexRule
+       prefix    string
+}
+
+func newPrefixNode(prefix string, indexRule *databasev1.IndexRule) *prefixNode 
{
+       return &prefixNode{
+               indexRule: indexRule,
+               prefix:    prefix,
+       }
+}
+
+func (m *prefixNode) MarshalJSON() ([]byte, error) {
+       inner := make(map[string]interface{}, 1)
+       inner["index"] = m.indexRule.Metadata.Name + ":" + 
m.indexRule.Metadata.Group
+       inner["value"] = m.prefix
+       data := make(map[string]interface{}, 1)
+       data["prefix"] = inner
+       return json.Marshal(data)
+}
+
+func (m *prefixNode) String() string {
+       return convert.JSONToString(m)
+}
+
+type wildcardNode struct {
+       indexRule *databasev1.IndexRule
+       wildcard  string
+}
+
+func newWildcardNode(wildcard string, indexRule *databasev1.IndexRule) 
*wildcardNode {
+       return &wildcardNode{
+               indexRule: indexRule,
+               wildcard:  wildcard,
+       }
+}
+
+func (m *wildcardNode) MarshalJSON() ([]byte, error) {
+       inner := make(map[string]interface{}, 1)
+       inner["index"] = m.indexRule.Metadata.Name + ":" + 
m.indexRule.Metadata.Group
+       inner["value"] = m.wildcard
+       data := make(map[string]interface{}, 1)
+       data["wildcard"] = inner
+       return json.Marshal(data)
+}
+
+func (m *wildcardNode) String() string {
+       return convert.JSONToString(m)
+}
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index c63b4c42..730b9e36 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -69,7 +69,7 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey 
index.FieldKey, order mode
                sortedKey = "-" + sortedKey
        }
        result := &sortIterator{
-               query:     query,
+               query:     &queryNode{query: query},
                reader:    reader,
                sortedKey: sortedKey,
                size:      preLoadSize,
@@ -78,12 +78,13 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey 
index.FieldKey, order mode
 }
 
 type sortIterator struct {
-       query     bluge.Query
+       query     index.Query
        err       error
        reader    *bluge.Reader
        current   *blugeMatchIterator
        closer    *run.Closer
        sortedKey string
+       fields    []string
        size      int
        skipped   int
 }
@@ -109,7 +110,7 @@ func (si *sortIterator) loadCurrent() bool {
                // overflow
                size = math.MaxInt64
        }
-       topNSearch := bluge.NewTopNSearch(size, 
si.query).SortBy([]string{si.sortedKey})
+       topNSearch := bluge.NewTopNSearch(size, 
si.query.(*queryNode).query).SortBy([]string{si.sortedKey})
        if si.skipped > 0 {
                topNSearch = topNSearch.SetFrom(si.skipped)
        }
@@ -120,7 +121,7 @@ func (si *sortIterator) loadCurrent() bool {
                return false
        }
 
-       iter := newBlugeMatchIterator(documentMatchIterator, nil, nil)
+       iter := newBlugeMatchIterator(documentMatchIterator, nil, si.fields)
        si.current = &iter
        if si.next() {
                return true
@@ -159,3 +160,7 @@ func (si *sortIterator) Close() error {
        }
        return errors.Join(si.err, si.current.Close(), si.reader.Close())
 }
+
+func (si *sortIterator) Query() index.Query {
+       return si.query
+}
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 83033e3e..45e3aef2 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -288,7 +288,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                t.Run(tt.name, func(t *testing.T) {
                        tester := assert.New(t)
                        is := require.New(t)
-                       iter, err := store.Iterator(tt.args.fieldKey, 
tt.args.termRange, tt.args.orderType, tt.preloadSize)
+                       iter, err := store.Iterator(tt.args.fieldKey, 
tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil)
                        is.NoError(err)
                        if iter == nil {
                                tester.Empty(tt.want)
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index a833023f..1d74d375 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -27,6 +27,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -114,7 +115,7 @@ var (
 )
 
 type localIndexScan struct {
-       query                *inverted.Query
+       query                index.Query
        schema               logical.Schema
        uis                  *unresolvedIndexScan
        order                *logical.OrderBy
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 20e98659..71054029 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -25,7 +25,6 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -73,7 +72,7 @@ const (
 
 // MeasureQueryOptions is the options of a measure query.
 type MeasureQueryOptions struct {
-       Query           *inverted.Query
+       Query           index.Query
        TimeRange       *timestamp.TimeRange
        Order           *OrderBy
        Name            string


Reply via email to