This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4588b55c Merge the query process of series index (#505)
4588b55c is described below
commit 4588b55cdc005521ca32d504920925c0cb9d62d1
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Aug 9 06:54:43 2024 +0800
Merge the query process of series index (#505)
* Merge queries of primary index and secondary index
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/internal/storage/index.go | 125 ++++++++-------------
banyand/internal/storage/index_test.go | 2 +-
banyand/internal/storage/storage.go | 3 +-
pkg/index/index.go | 26 +++--
pkg/index/inverted/inverted.go | 53 +++++----
pkg/index/inverted/inverted_series.go | 57 +++++++---
pkg/index/inverted/inverted_series_test.go | 16 ++-
pkg/index/inverted/query.go | 122 +++++++++++++-------
pkg/index/inverted/sort.go | 13 ++-
pkg/index/testcases/duration.go | 2 +-
.../measure/measure_plan_indexscan_local.go | 3 +-
pkg/query/model/model.go | 3 +-
12 files changed, 232 insertions(+), 193 deletions(-)
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index d5950c15..d3b059ae 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -19,6 +19,7 @@ package storage
import (
"context"
+ "maps"
"path"
"github.com/pkg/errors"
@@ -27,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
- "github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
@@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB {
}
func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series)
(pbv1.SeriesList, error) {
- sl, _, err := s.index.searchPrimary(ctx, series, nil)
+ sl, _, err := s.index.filter(ctx, series, nil, nil)
return sl, err
}
@@ -70,7 +70,9 @@ func (s *seriesIndex) Write(docs index.Documents) error {
var rangeOpts = index.RangeOpts{}
-func (s *seriesIndex) searchPrimary(ctx context.Context, series
[]*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields
FieldResultList, err error) {
+func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
+ projection []index.FieldKey, secondaryQuery index.Query,
+) (sl pbv1.SeriesList, fields FieldResultList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err =
convertEntityValuesToSeriesMatcher(series[i])
@@ -78,10 +80,14 @@ func (s *seriesIndex) searchPrimary(ctx context.Context,
series []*pbv1.Series,
return nil, nil, err
}
}
+ indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
+ if err != nil {
+ return nil, nil, err
+ }
tracer := query.GetTracer(ctx)
if tracer != nil {
- span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
- span.Tagf("matchers", "%v", seriesMatchers)
+ span, _ := tracer.StartSpan(ctx, "seriesIndex.search")
+ span.Tagf("query", "%s", indexQuery.String())
defer func() {
span.Tagf("matched", "%d", len(sl))
if len(fields) > 0 {
@@ -93,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context,
series []*pbv1.Series,
span.Stop()
}()
}
- ss, err := s.store.Search(ctx, seriesMatchers, projection)
+ ss, err := s.store.Search(ctx, projection, indexQuery)
if err != nil {
return nil, nil, err
}
@@ -191,44 +197,19 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
span.Stop()
}()
}
- seriesList, fieldResultList, err := s.searchPrimary(ctx, series,
opts.Projection)
- if err != nil {
- return nil, nil, err
- }
- pl := seriesList.ToList()
- if opts.Query != nil {
- var plFilter posting.List
- func() {
- if tracer != nil {
- span, _ := tracer.StartSpan(ctx, "filter")
- span.Tag("exp", opts.Query.String())
- defer func() {
- if err != nil {
- span.Error(err)
- } else {
- span.Tagf("matched", "%d",
plFilter.Len())
- span.Tagf("total", "%d",
pl.Len())
- }
- span.Stop()
- }()
- }
- if plFilter, err = s.store.Execute(ctx, opts.Query);
err != nil {
- return
- }
- if plFilter == nil {
- return
- }
- err = pl.Intersect(plFilter)
- }()
+ if opts.Order == nil || opts.Order.Index == nil {
+ var seriesList pbv1.SeriesList
+ var fieldResultList FieldResultList
+ if opts.Query != nil {
+ seriesList, fieldResultList, err = s.filter(ctx,
series, opts.Projection, opts.Query)
+ } else {
+ seriesList, fieldResultList, err = s.filter(ctx,
series, opts.Projection, nil)
+ }
if err != nil {
return nil, nil, err
}
- }
-
- if opts.Order == nil || opts.Order.Index == nil {
- sl, frl = filterSeriesList(seriesList, fieldResultList, pl)
- return sl, frl, nil
+ return seriesList, fieldResultList, nil
}
fieldKey := index.FieldKey{
@@ -245,8 +226,19 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
span.Stop()
}()
}
+ seriesMatchers := make([]index.SeriesMatcher, len(series))
+ for i := range series {
+ seriesMatchers[i], err =
convertEntityValuesToSeriesMatcher(series[i])
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ query, err := s.store.BuildQuery(seriesMatchers, opts.Query)
+ if err != nil {
+ return nil, nil, err
+ }
iter, err := s.store.Iterator(fieldKey, rangeOpts,
- opts.Order.Sort, opts.PreloadSize)
+ opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
}
@@ -254,56 +246,29 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
err = multierr.Append(err, iter.Close())
}()
- var sortedSeriesList pbv1.SeriesList
- var sortedFieldResultList FieldResultList
var r int
+ result := make([]index.SeriesDocument, 0, 10)
for iter.Next() {
r++
- docID := iter.Val().DocID
- if !pl.Contains(docID) {
- continue
- }
- sortedSeriesList, sortedFieldResultList = appendSeriesList(
- sortedSeriesList, seriesList,
- sortedFieldResultList, fieldResultList,
- common.SeriesID(docID))
- if err != nil {
- return nil, nil, err
- }
+ val := iter.Val()
+ var doc index.SeriesDocument
+ doc.Fields = maps.Clone(val.Values)
+ doc.Key.ID = common.SeriesID(val.DocID)
+ doc.Key.EntityValues = val.EntityValues
+ result = append(result, doc)
+ }
+ sortedSeriesList, sortedFieldResultList, err :=
convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
+ if err != nil {
+ return nil, nil, errors.WithMessagef(err, "failed to convert
index series to series list, matchers: %v, matched: %d", seriesMatchers,
len(result))
}
if span != nil {
+ span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sortedSeriesList))
}
return sortedSeriesList, sortedFieldResultList, err
}
-func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList
FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) {
- for i := 0; i < len(seriesList); i++ {
- if !filter.Contains(uint64(seriesList[i].ID)) {
- seriesList = append(seriesList[:i], seriesList[i+1:]...)
- if fieldResultList != nil {
- fieldResultList = append(fieldResultList[:i],
fieldResultList[i+1:]...)
- }
- i--
- }
- }
- return seriesList, fieldResultList
-}
-
-func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL
FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) {
- for i := 0; i < len(src); i++ {
- if target == src[i].ID {
- dest = append(dest, src[i])
- if srcFRL != nil {
- destFRL = append(destFRL, srcFRL[i])
- }
- break
- }
- }
- return dest, destFRL
-}
-
func (s *seriesIndex) Close() error {
return s.store.Close()
}
diff --git a/banyand/internal/storage/index_test.go
b/banyand/internal/storage/index_test.go
index d73886a3..2b196ec7 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -157,7 +157,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries,
&seriesQuery)
}
- sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
+ sl, _, err := si.filter(ctx, seriesQueries, nil, nil)
require.NoError(t, err)
require.Equal(t, len(tt.entityValues), len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index fb7e0af6..47b82639 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -34,7 +34,6 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T
// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
- Query *inverted.Query
+ Query index.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 8234f496..a7da177b 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -24,8 +24,6 @@ import (
"fmt"
"io"
- "github.com/blugelabs/bluge"
-
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int {
// DocumentResult represents a document in an index.
type DocumentResult struct {
- Values map[string][]byte
- SortedValue []byte
- SeriesID common.SeriesID
- DocID uint64
- Timestamp int64
+ EntityValues []byte
+ Values map[string][]byte
+ SortedValue []byte
+ SeriesID common.SeriesID
+ DocID uint64
+ Timestamp int64
}
// SortedField returns the value of the sorted field.
@@ -114,6 +113,7 @@ type FieldIterator[T sort.Comparable] interface {
Next() bool
Val() T
Close() error
+ Query() Query
}
// DummyFieldIterator never iterates.
@@ -133,6 +133,10 @@ func (i *dummyIterator) Close() error {
return nil
}
+func (i *dummyIterator) Query() Query {
+ return nil
+}
+
// Document represents a document in an index.
type Document struct {
Fields []Field
@@ -156,7 +160,8 @@ type Writer interface {
// FieldIterable allows building a FieldIterator.
type FieldIterable interface {
- Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort,
preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
+ BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query)
(Query, error)
+ Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort,
preLoadSize int, query Query, fieldKeys []FieldKey) (iter
FieldIterator[*DocumentResult], err error)
Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort,
timeRange *timestamp.TimeRange, preLoadSize int)
(FieldIterator[*DocumentResult], error)
}
@@ -171,9 +176,7 @@ type Searcher interface {
// Query is an abstract of an index query.
type Query interface {
- bluge.Query
fmt.Stringer
- Query() bluge.Query
}
// Store is an abstract of an index repository.
@@ -204,8 +207,7 @@ type SeriesDocument struct {
type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
- Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument,
error)
- Execute(context.Context, Query) (posting.List, error)
+ Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
}
// SeriesMatcherType represents the type of series matcher.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 81af6e20..866aacc7 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -174,8 +174,8 @@ func (s *store) Close() error {
return s.writer.Close()
}
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
- order modelv1.Sort, preLoadSize int,
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
order modelv1.Sort,
+ preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey,
) (iter index.FieldIterator[*index.DocumentResult], err error) {
if termRange.Lower != nil &&
termRange.Upper != nil &&
@@ -191,7 +191,8 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts,
return nil, err
}
fk := fieldKey.Marshal()
- query := bluge.NewBooleanQuery()
+ rangeQuery := bluge.NewBooleanQuery()
+ rangeNode := newMustNode()
addRange := func(query *bluge.BooleanQuery, termRange index.RangeOpts)
*bluge.BooleanQuery {
if termRange.Upper == nil {
termRange.Upper = defaultUpper
@@ -206,25 +207,39 @@ func (s *store) Iterator(fieldKey index.FieldKey,
termRange index.RangeOpts,
termRange.IncludesUpper,
).
SetField(fk))
+
rangeNode.Append(newTermRangeInclusiveNode(string(termRange.Lower),
string(termRange.Upper), termRange.IncludesLower, termRange.IncludesUpper, nil))
return query
}
if fieldKey.HasSeriesID() {
- query =
query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
+ rangeQuery =
rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
SetField(seriesIDField))
+
rangeNode.Append(newTermNode(string(fieldKey.SeriesID.Marshal()), nil))
if termRange.Lower != nil || termRange.Upper != nil {
- query = addRange(query, termRange)
+ rangeQuery = addRange(rangeQuery, termRange)
}
} else {
- query = addRange(query, termRange)
+ rangeQuery = addRange(rangeQuery, termRange)
}
sortedKey := fk
if order == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
}
+ query := bluge.NewBooleanQuery().AddMust(rangeQuery)
+ node := newMustNode()
+ node.Append(rangeNode)
+ if indexQuery != nil && indexQuery.(*queryNode).query != nil {
+ query.AddMust(indexQuery.(*queryNode).query)
+ node.Append(indexQuery.(*queryNode).node)
+ }
+ fields := make([]string, 0, len(fieldKeys))
+ for i := range fieldKeys {
+ fields = append(fields, fieldKeys[i].Marshal())
+ }
result := &sortIterator{
- query: query,
+ query: &queryNode{query, node},
+ fields: fields,
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
@@ -298,7 +313,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches
[]string) (posting.List,
}
func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list
posting.List, err error) {
- iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC,
defaultRangePreloadSize)
+ iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC,
defaultRangePreloadSize, nil, nil)
if err != nil {
return roaring.DummyPostingList, err
}
@@ -310,26 +325,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts
index.RangeOpts) (list posti
return
}
-func (s *store) Execute(ctx context.Context, query index.Query) (posting.List,
error) {
- reader, err := s.writer.Reader()
- if err != nil {
- return nil, err
- }
- documentMatchIterator, err := reader.Search(ctx,
bluge.NewAllMatches(query.Query()))
- if err != nil {
- return nil, err
- }
- iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
- defer func() {
- err = multierr.Append(err, iter.Close())
- }()
- list := roaring.NewPostingList()
- for iter.Next() {
- list.Insert(iter.Val().DocID)
- }
- return list, err
-}
-
func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
@@ -380,6 +375,8 @@ func (bmi *blugeMatchIterator) Next() bool {
}
err := match.VisitStoredFields(func(field string, value []byte) bool {
switch field {
+ case entityField:
+ bmi.current.EntityValues = value
case docIDField:
bmi.current.DocID = convert.BytesToUint64(value)
case seriesIDField:
diff --git a/pkg/index/inverted/inverted_series.go
b/pkg/index/inverted/inverted_series.go
index 02852397..7c3e31f2 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -33,48 +33,71 @@ import (
var emptySeries = make([]index.SeriesDocument, 0)
-// Search implements index.SeriesStore.
-func (s *store) Search(ctx context.Context, seriesMatchers
[]index.SeriesMatcher, projection []index.FieldKey) ([]index.SeriesDocument,
error) {
+// BuildQuery implements index.SeriesStore.
+func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher,
secondaryQuery index.Query) (index.Query, error) {
if len(seriesMatchers) == 0 {
- return emptySeries, nil
- }
- reader, err := s.writer.Reader()
- if err != nil {
- return nil, err
+ return secondaryQuery, nil
}
- defer func() {
- _ = reader.Close()
- }()
+
qs := make([]bluge.Query, len(seriesMatchers))
+ primaryNode := newShouldNode()
for i := range seriesMatchers {
switch seriesMatchers[i].Type {
case index.SeriesMatcherTypeExact:
- q :=
bluge.NewTermQuery(convert.BytesToString(seriesMatchers[i].Match))
+ match := convert.BytesToString(seriesMatchers[i].Match)
+ q := bluge.NewTermQuery(match)
q.SetField(entityField)
qs[i] = q
+ primaryNode.Append(newTermNode(match, nil))
case index.SeriesMatcherTypePrefix:
- q :=
bluge.NewPrefixQuery(convert.BytesToString(seriesMatchers[i].Match))
+ match := convert.BytesToString(seriesMatchers[i].Match)
+ q := bluge.NewPrefixQuery(match)
q.SetField(entityField)
qs[i] = q
+ primaryNode.Append(newPrefixNode(match, nil))
case index.SeriesMatcherTypeWildcard:
- q :=
bluge.NewWildcardQuery(convert.BytesToString(seriesMatchers[i].Match))
+ match := convert.BytesToString(seriesMatchers[i].Match)
+ q := bluge.NewWildcardQuery(match)
q.SetField(entityField)
qs[i] = q
+ primaryNode.Append(newWildcardNode(match, nil))
default:
return nil, errors.Errorf("unsupported series matcher
type: %v", seriesMatchers[i].Type)
}
}
- var query bluge.Query
+ var primaryQuery bluge.Query
if len(qs) > 1 {
bq := bluge.NewBooleanQuery()
bq.AddShould(qs...)
bq.SetMinShould(1)
- query = bq
+ primaryQuery = bq
} else {
- query = qs[0]
+ primaryQuery = qs[0]
+ }
+
+ query := bluge.NewBooleanQuery().AddMust(primaryQuery)
+ node := newMustNode()
+ node.Append(primaryNode)
+ if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil {
+ query.AddMust(secondaryQuery.(*queryNode).query)
+ node.Append(secondaryQuery.(*queryNode).node)
}
+ return &queryNode{query, node}, nil
+}
+
+// Search implements index.SeriesStore.
+func (s *store) Search(ctx context.Context,
+ projection []index.FieldKey, query index.Query,
+) ([]index.SeriesDocument, error) {
+ reader, err := s.writer.Reader()
+ if err != nil {
+ return nil, err
+ }
+ defer func() {
+ _ = reader.Close()
+ }()
- dmi, err := reader.Search(ctx, bluge.NewAllMatches(query))
+ dmi, err := reader.Search(ctx,
bluge.NewAllMatches(query.(*queryNode).query))
if err != nil {
return nil, err
}
diff --git a/pkg/index/inverted/inverted_series_test.go
b/pkg/index/inverted/inverted_series_test.go
index 03a5e181..647189d9 100644
--- a/pkg/index/inverted/inverted_series_test.go
+++ b/pkg/index/inverted/inverted_series_test.go
@@ -191,7 +191,9 @@ func TestStore_Search(t *testing.T) {
name += string(term) + "-"
}
t.Run(name, func(t *testing.T) {
- got, err := s.Search(context.Background(), matchers,
tt.projection)
+ query, err := s.BuildQuery(matchers, nil)
+ require.NoError(t, err)
+ got, err := s.Search(context.Background(),
tt.projection, query)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
@@ -273,12 +275,14 @@ func TestStore_SearchWildcard(t *testing.T) {
for _, tt := range tests {
t.Run(string(tt.wildcard), func(t *testing.T) {
- got, err := s.Search(context.Background(),
[]index.SeriesMatcher{
+ query, err := s.BuildQuery([]index.SeriesMatcher{
{
Type: index.SeriesMatcherTypeWildcard,
Match: tt.wildcard,
},
- }, tt.projection)
+ }, nil)
+ require.NoError(t, err)
+ got, err := s.Search(context.Background(),
tt.projection, query)
require.NoError(t, err)
assert.ElementsMatch(t, tt.want, got)
})
@@ -338,12 +342,14 @@ func TestStore_SearchPrefix(t *testing.T) {
for _, tt := range tests {
t.Run(string(tt.prefix), func(t *testing.T) {
- got, err := s.Search(context.Background(),
[]index.SeriesMatcher{
+ query, err := s.BuildQuery([]index.SeriesMatcher{
{
Type: index.SeriesMatcherTypePrefix,
Match: tt.prefix,
},
- }, tt.projection)
+ }, nil)
+ require.NoError(t, err)
+ got, err := s.Search(context.Background(),
tt.projection, query)
require.NoError(t, err)
assert.ElementsMatch(t, tt.want, got)
})
diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go
index e3f90fda..b34fd719 100644
--- a/pkg/index/inverted/query.go
+++ b/pkg/index/inverted/query.go
@@ -24,12 +24,12 @@ import (
"strings"
"github.com/blugelabs/bluge"
- "github.com/blugelabs/bluge/search"
"github.com/pkg/errors"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -49,30 +49,22 @@ type GlobalIndexError struct {
func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
-// Query is a wrapper for bluge.Query.
-type Query struct {
+var _ index.Query = (*queryNode)(nil)
+
+// queryNode is a wrapper for bluge.Query.
+type queryNode struct {
query bluge.Query
node
}
-// Searcher implements index.Query.
-func (q *Query) Searcher(i search.Reader, options search.SearcherOptions)
(search.Searcher, error) {
- return q.query.Searcher(i, options)
-}
-
-func (q *Query) String() string {
+func (q *queryNode) String() string {
return q.node.String()
}
-// Query implements index.Query.
-func (q *Query) Query() bluge.Query {
- return q.query
-}
-
// BuildLocalQuery returns blugeQuery for local indices.
func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema,
entityDict map[string]int,
entity []*modelv1.TagValue,
-) (*Query, [][]*modelv1.TagValue, bool, error) {
+) (index.Query, [][]*modelv1.TagValue, bool, error) {
if criteria == nil {
return nil, [][]*modelv1.TagValue{entity}, false, nil
}
@@ -117,7 +109,7 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema
logical.Schema, entityDi
return nil, entities, false, nil
}
if leftIsMatchAllQuery && rightIsMatchAllQuery {
- return &Query{
+ return &queryNode{
query: bluge.NewMatchAllQuery(),
node: newMatchAllNode(),
}, entities, true, nil
@@ -126,17 +118,17 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema
logical.Schema, entityDi
case modelv1.LogicalExpression_LOGICAL_OP_AND:
query, node := bluge.NewBooleanQuery(), newMustNode()
if left != nil {
- query.AddMust(left.query)
- node.Append(left.node)
+ query.AddMust(left.(*queryNode).query)
+ node.Append(left.(*queryNode).node)
}
if right != nil {
- query.AddMust(right.query)
- node.Append(right.node)
+ query.AddMust(right.(*queryNode).query)
+ node.Append(right.(*queryNode).node)
}
- return &Query{query, node}, entities, false, nil
+ return &queryNode{query, node}, entities, false, nil
case modelv1.LogicalExpression_LOGICAL_OP_OR:
if leftIsMatchAllQuery || rightIsMatchAllQuery {
- return &Query{
+ return &queryNode{
query: bluge.NewMatchAllQuery(),
node: newMatchAllNode(),
}, entities, true, nil
@@ -144,14 +136,14 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema
logical.Schema, entityDi
query, node := bluge.NewBooleanQuery(), newShouldNode()
query.SetMinShould(1)
if left != nil {
- query.AddShould(left.query)
- node.Append(left.node)
+ query.AddShould(left.(*queryNode).query)
+ node.Append(left.(*queryNode).node)
}
if right != nil {
- query.AddShould(right.query)
- node.Append(right.node)
+ query.AddShould(right.(*queryNode).query)
+ node.Append(right.(*queryNode).node)
}
- return &Query{query, node}, entities, false, nil
+ return &queryNode{query, node}, entities, false, nil
}
}
return nil, nil, false, logical.ErrInvalidCriteriaType
@@ -159,11 +151,11 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema
logical.Schema, entityDi
func parseConditionToQuery(cond *modelv1.Condition, indexRule
*databasev1.IndexRule,
expr logical.LiteralExpr, entity []*modelv1.TagValue,
-) (*Query, [][]*modelv1.TagValue, bool, error) {
+) (*queryNode, [][]*modelv1.TagValue, bool, error) {
field := string(convert.Uint32ToBytes(indexRule.Metadata.Id))
b := expr.Bytes()
if len(b) < 1 {
- return &Query{
+ return &queryNode{
query: bluge.NewMatchAllQuery(),
node: newMatchAllNode(),
}, [][]*modelv1.TagValue{entity}, true, nil
@@ -173,32 +165,32 @@ func parseConditionToQuery(cond *modelv1.Condition,
indexRule *databasev1.IndexR
case modelv1.Condition_BINARY_OP_GT:
query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false,
false).SetField(field)
node := newTermRangeInclusiveNode(str, maxInf, false, false,
indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_GE:
query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true,
false).SetField(field)
node := newTermRangeInclusiveNode(str, maxInf, true, false,
indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_LT:
query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false,
false).SetField(field)
node := newTermRangeInclusiveNode(minInf, str, false, false,
indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_LE:
query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false,
true).SetField(field)
node := newTermRangeInclusiveNode(minInf, str, false, true,
indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_EQ:
query := bluge.NewTermQuery(term).SetField(field)
node := newTermNode(str, indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_MATCH:
query :=
bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer])
node := newMatchNode(str, indexRule)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_NE:
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(bluge.NewTermQuery(term).SetField(field))
node.SetSubNode(newTermNode(str, indexRule))
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_HAVING:
bb, elements := expr.Bytes(), expr.Elements()
query, node := bluge.NewBooleanQuery(), newMustNode()
@@ -208,7 +200,7 @@ func parseConditionToQuery(cond *modelv1.Condition,
indexRule *databasev1.IndexR
for _, e := range elements {
node.Append(newTermNode(e, indexRule))
}
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
bb, elements := expr.Bytes(), expr.Elements()
subQuery, subNode := bluge.NewBooleanQuery(), newMustNode()
@@ -221,7 +213,7 @@ func parseConditionToQuery(cond *modelv1.Condition,
indexRule *databasev1.IndexR
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(subQuery)
node.SetSubNode(node)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_IN:
bb, elements := expr.Bytes(), expr.Elements()
query, node := bluge.NewBooleanQuery(), newShouldNode()
@@ -232,7 +224,7 @@ func parseConditionToQuery(cond *modelv1.Condition,
indexRule *databasev1.IndexR
for _, e := range elements {
node.Append(newTermNode(e, indexRule))
}
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
bb, elements := expr.Bytes(), expr.Elements()
subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode()
@@ -246,7 +238,7 @@ func parseConditionToQuery(cond *modelv1.Condition,
indexRule *databasev1.IndexR
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(subQuery)
node.SetSubNode(subNode)
- return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ return &queryNode{query, node}, [][]*modelv1.TagValue{entity},
false, nil
}
return nil, nil, false,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses
%v", cond)
}
@@ -429,3 +421,53 @@ func (m *matchNode) MarshalJSON() ([]byte, error) {
func (m *matchNode) String() string {
return convert.JSONToString(m)
}
+
+type prefixNode struct {
+ indexRule *databasev1.IndexRule
+ prefix string
+}
+
+func newPrefixNode(prefix string, indexRule *databasev1.IndexRule) *prefixNode
{
+ return &prefixNode{
+ indexRule: indexRule,
+ prefix: prefix,
+ }
+}
+
+func (m *prefixNode) MarshalJSON() ([]byte, error) {
+ inner := make(map[string]interface{}, 1)
+ inner["index"] = m.indexRule.Metadata.Name + ":" +
m.indexRule.Metadata.Group
+ inner["value"] = m.prefix
+ data := make(map[string]interface{}, 1)
+ data["prefix"] = inner
+ return json.Marshal(data)
+}
+
+func (m *prefixNode) String() string {
+ return convert.JSONToString(m)
+}
+
+type wildcardNode struct {
+ indexRule *databasev1.IndexRule
+ wildcard string
+}
+
+func newWildcardNode(wildcard string, indexRule *databasev1.IndexRule)
*wildcardNode {
+ return &wildcardNode{
+ indexRule: indexRule,
+ wildcard: wildcard,
+ }
+}
+
+func (m *wildcardNode) MarshalJSON() ([]byte, error) {
+ inner := make(map[string]interface{}, 1)
+ inner["index"] = m.indexRule.Metadata.Name + ":" +
m.indexRule.Metadata.Group
+ inner["value"] = m.wildcard
+ data := make(map[string]interface{}, 1)
+ data["wildcard"] = inner
+ return json.Marshal(data)
+}
+
+func (m *wildcardNode) String() string {
+ return convert.JSONToString(m)
+}
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index c63b4c42..730b9e36 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -69,7 +69,7 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey
index.FieldKey, order mode
sortedKey = "-" + sortedKey
}
result := &sortIterator{
- query: query,
+ query: &queryNode{query: query},
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
@@ -78,12 +78,13 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey
index.FieldKey, order mode
}
type sortIterator struct {
- query bluge.Query
+ query index.Query
err error
reader *bluge.Reader
current *blugeMatchIterator
closer *run.Closer
sortedKey string
+ fields []string
size int
skipped int
}
@@ -109,7 +110,7 @@ func (si *sortIterator) loadCurrent() bool {
// overflow
size = math.MaxInt64
}
- topNSearch := bluge.NewTopNSearch(size,
si.query).SortBy([]string{si.sortedKey})
+ topNSearch := bluge.NewTopNSearch(size,
si.query.(*queryNode).query).SortBy([]string{si.sortedKey})
if si.skipped > 0 {
topNSearch = topNSearch.SetFrom(si.skipped)
}
@@ -120,7 +121,7 @@ func (si *sortIterator) loadCurrent() bool {
return false
}
- iter := newBlugeMatchIterator(documentMatchIterator, nil, nil)
+ iter := newBlugeMatchIterator(documentMatchIterator, nil, si.fields)
si.current = &iter
if si.next() {
return true
@@ -159,3 +160,7 @@ func (si *sortIterator) Close() error {
}
return errors.Join(si.err, si.current.Close(), si.reader.Close())
}
+
+func (si *sortIterator) Query() index.Query {
+ return si.query
+}
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 83033e3e..45e3aef2 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -288,7 +288,7 @@ func RunDuration(t *testing.T, data map[int]posting.List,
store SimpleStore) {
t.Run(tt.name, func(t *testing.T) {
tester := assert.New(t)
is := require.New(t)
- iter, err := store.Iterator(tt.args.fieldKey,
tt.args.termRange, tt.args.orderType, tt.preloadSize)
+ iter, err := store.Iterator(tt.args.fieldKey,
tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil)
is.NoError(err)
if iter == nil {
tester.Empty(tt.want)
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index a833023f..1d74d375 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -27,6 +27,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -114,7 +115,7 @@ var (
)
type localIndexScan struct {
- query *inverted.Query
+ query index.Query
schema logical.Schema
uis *unresolvedIndexScan
order *logical.OrderBy
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 20e98659..71054029 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -25,7 +25,6 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -73,7 +72,7 @@ const (
// MeasureQueryOptions is the options of a measure query.
type MeasureQueryOptions struct {
- Query *inverted.Query
+ Query index.Query
TimeRange *timestamp.TimeRange
Order *OrderBy
Name string