This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 198129c6 Optimize query performance of series index (#491)
198129c6 is described below
commit 198129c6901cb4cb5bf123aa19651ab701837736
Author: Huang Youliang <[email protected]>
AuthorDate: Sat Aug 3 15:16:10 2024 +0800
Optimize query performance of series index (#491)
---
CHANGES.md | 1 +
banyand/internal/storage/index.go | 23 +-
banyand/internal/storage/storage.go | 3 +-
banyand/measure/query.go | 2 +-
banyand/stream/query.go | 4 +-
.../logical/interface.go => convert/json.go} | 45 +--
pkg/index/index.go | 22 +-
pkg/index/inverted/inverted.go | 35 +-
pkg/index/inverted/query.go | 431 +++++++++++++++++++++
pkg/query/logical/common.go | 17 +-
pkg/query/logical/expr.go | 10 +
pkg/query/logical/expr_literal.go | 73 +++-
pkg/query/logical/interface.go | 1 +
.../measure/measure_plan_indexscan_local.go | 12 +-
pkg/query/logical/parser.go | 159 ++++++++
pkg/query/logical/{ => stream}/index_filter.go | 220 ++---------
pkg/query/logical/stream/stream_plan_tag_filter.go | 2 +-
pkg/query/logical/tag_filter.go | 19 +-
pkg/query/model/model.go | 3 +-
19 files changed, 790 insertions(+), 292 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 50884405..ba07da6f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
- Add the stream query trace.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
+- Optimize query performance of series index.
### Bugs
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index b8cd31cb..d5950c15 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -20,20 +20,17 @@ package storage
import (
"context"
"path"
- "strings"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
- "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
func (s *segment[T, O]) IndexDB() IndexDB {
@@ -200,24 +197,12 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
}
pl := seriesList.ToList()
- if opts.Filter != nil && opts.Filter != logical.ENode {
+ if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
- span.Tag("exp", opts.Filter.String())
- var projectionStrBuilder strings.Builder
- if len(opts.Projection) > 0 {
- projectionStrBuilder.WriteString("[")
- for i, p := range opts.Projection {
- if i > 0 {
-
projectionStrBuilder.WriteString(", ")
- }
-
projectionStrBuilder.WriteRune(rune(p.IndexRuleID))
- }
- projectionStrBuilder.WriteString("]")
- span.Tagf("projection", "%s",
projectionStrBuilder.String())
- }
+ span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
@@ -228,9 +213,7 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
span.Stop()
}()
}
- if plFilter, err = opts.Filter.Execute(func(_
databasev1.IndexRule_Type) (index.Searcher, error) {
- return s.store, nil
- }, 0); err != nil {
+ if plFilter, err = s.store.Execute(ctx, opts.Query);
err != nil {
return
}
if plFilter == nil {
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 1ccf03d7..fb7e0af6 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -34,6 +34,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -66,7 +67,7 @@ type SupplyTSDB[T TSTable] func() T
// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
- Filter index.Filter
+ Query *inverted.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ae37521a..fa94091e 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -194,7 +194,7 @@ func (s *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
seriesFilter := roaring.NewPostingList()
for i := range segments {
sll, fieldResultList, err := segments[i].IndexDB().Search(ctx,
series, storage.IndexSearchOpts{
- Filter: mqo.Filter,
+ Query: mqo.Query,
Order: mqo.Order,
PreloadSize: preloadSize,
Projection: indexProjection,
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 9dff205f..0a5ebdc3 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -37,7 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
- "github.com/apache/skywalking-banyandb/pkg/query/logical"
+ logicalstream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
@@ -523,7 +523,7 @@ func (qr *queryResult) mergeByTimestamp()
*model.StreamResult {
func indexSearch(sqo model.StreamQueryOptions,
tabs []*tsTable, seriesList pbv1.SeriesList,
) (posting.List, error) {
- if sqo.Filter == nil || sqo.Filter == logical.ENode {
+ if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
return nil, nil
}
result := roaring.NewPostingList()
diff --git a/pkg/query/logical/interface.go b/pkg/convert/json.go
similarity index 50%
copy from pkg/query/logical/interface.go
copy to pkg/convert/json.go
index dc002cad..22b250d9 100644
--- a/pkg/query/logical/interface.go
+++ b/pkg/convert/json.go
@@ -15,42 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package logical
+package convert
-import (
- "fmt"
-)
+import "encoding/json"
-// UnresolvedPlan denotes an logical expression.
-// It could be analyzed to a Plan(executable operation) with the Schema.
-type UnresolvedPlan interface {
- Analyze(Schema) (Plan, error)
-}
-
-// Plan is the executable operation. It belongs to a execution tree.
-type Plan interface {
- fmt.Stringer
- Children() []Plan
- Schema() Schema
-}
-
-// Expr represents a predicate in criteria.
-type Expr interface {
- fmt.Stringer
- DataType() int32
- Equal(Expr) bool
-}
-
-// LiteralExpr allows getting raw data represented as bytes.
-type LiteralExpr interface {
- Expr
- Bytes() [][]byte
-}
-
-// ComparableExpr allows comparing Expr and Expr arrays.
-type ComparableExpr interface {
- LiteralExpr
- Compare(LiteralExpr) (int, bool)
- BelongTo(LiteralExpr) bool
- Contains(LiteralExpr) bool
+// JSONToString converts a JSON marshaler to its JSON string representation.
+func JSONToString(marshaler json.Marshaler) string {
+ bb, err := marshaler.MarshalJSON()
+ if err != nil {
+ return err.Error()
+ }
+ return string(bb)
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 5d5bf91e..8234f496 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -24,6 +24,8 @@ import (
"fmt"
"io"
+ "github.com/blugelabs/bluge"
+
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -93,7 +95,7 @@ func (r RangeOpts) Between(value []byte) int {
return 0
}
-// DocumentResult represents a document in a index.
+// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
@@ -131,7 +133,7 @@ func (i *dummyIterator) Close() error {
return nil
}
-// Document represents a document in a index.
+// Document represents a document in an index.
type Document struct {
Fields []Field
EntityValues []byte
@@ -147,7 +149,7 @@ type Batch struct {
Documents Documents
}
-// Writer allows writing fields and docID in a document to a index.
+// Writer allows writing fields and docID in a document to an index.
type Writer interface {
Batch(batch Batch) error
}
@@ -167,7 +169,14 @@ type Searcher interface {
Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
}
-// Store is an abstract of a index repository.
+// Query is an abstract of an index query.
+type Query interface {
+ bluge.Query
+ fmt.Stringer
+ Query() bluge.Query
+}
+
+// Store is an abstract of an index repository.
type Store interface {
io.Closer
Writer
@@ -175,7 +184,7 @@ type Store interface {
SizeOnDisk() int64
}
-// Series represents a series in a index.
+// Series represents a series in an index.
type Series struct {
EntityValues []byte
ID common.SeriesID
@@ -185,7 +194,7 @@ func (s Series) String() string {
return fmt.Sprintf("%s:%d", s.EntityValues, s.ID)
}
-// SeriesDocument represents a series document in a index.
+// SeriesDocument represents a series document in an index.
type SeriesDocument struct {
Fields map[string][]byte
Key Series
@@ -196,6 +205,7 @@ type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument,
error)
+ Execute(context.Context, Query) (posting.List, error)
}
// SeriesMatcherType represents the type of series matcher.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4d831fb2..a481b7f3 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-// Package inverted implements a inverted index repository.
+// Package inverted implements an inverted index repository.
package inverted
import (
@@ -62,10 +62,11 @@ var (
defaultProjection = []string{docIDField}
)
-var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
+// Analyzers is a map that associates each IndexRule_Analyzer type with a
corresponding Analyzer.
+var Analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
func init() {
- analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
+ Analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
databasev1.IndexRule_ANALYZER_KEYWORD:
analyzer.NewKeywordAnalyzer(),
databasev1.IndexRule_ANALYZER_SIMPLE:
analyzer.NewSimpleAnalyzer(),
databasev1.IndexRule_ANALYZER_STANDARD:
analyzer.NewStandardAnalyzer(),
@@ -74,7 +75,7 @@ func init() {
var _ index.Store = (*store)(nil)
-// StoreOpts wraps options to create a inverted index repository.
+// StoreOpts wraps options to create an inverted index repository.
type StoreOpts struct {
Logger *logger.Logger
Path string
@@ -124,7 +125,7 @@ func (s *store) Batch(batch index.Batch) error {
tf.StoreValue()
}
if f.Key.Analyzer !=
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
- tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer])
+ tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
}
@@ -153,7 +154,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000))
}
config := bluge.DefaultConfigWithIndexConfig(indexConfig)
- config.DefaultSearchAnalyzer =
analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
+ config.DefaultSearchAnalyzer =
Analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
w, err := bluge.OpenWriter(config)
if err != nil {
@@ -271,7 +272,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches
[]string) (posting.List,
if err != nil {
return nil, err
}
- analyzer := analyzers[fieldKey.Analyzer]
+ analyzer := Analyzers[fieldKey.Analyzer]
fk := fieldKey.Marshal()
query := bluge.NewBooleanQuery()
if fieldKey.HasSeriesID() {
@@ -309,6 +310,26 @@ func (s *store) Range(fieldKey index.FieldKey, opts
index.RangeOpts) (list posti
return
}
+func (s *store) Execute(ctx context.Context, query index.Query) (posting.List,
error) {
+ reader, err := s.writer.Reader()
+ if err != nil {
+ return nil, err
+ }
+ documentMatchIterator, err := reader.Search(ctx,
bluge.NewAllMatches(query.Query()))
+ if err != nil {
+ return nil, err
+ }
+ iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
+ defer func() {
+ err = multierr.Append(err, iter.Close())
+ }()
+ list := roaring.NewPostingList()
+ for iter.Next() {
+ list.Insert(iter.Val().DocID)
+ }
+ return list, err
+}
+
func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go
new file mode 100644
index 00000000..e3f90fda
--- /dev/null
+++ b/pkg/index/inverted/query.go
@@ -0,0 +1,431 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "strings"
+
+ "github.com/blugelabs/bluge"
+ "github.com/blugelabs/bluge/search"
+ "github.com/pkg/errors"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+var (
+ minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0])
+ maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0])
+ minInf = "-inf"
+ maxInf = "+inf"
+)
+
+// GlobalIndexError represents a index rule is "global".
+// The local filter can't handle it.
+type GlobalIndexError struct {
+ IndexRule *databasev1.IndexRule
+ Expr logical.LiteralExpr
+}
+
+func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
+
+// Query is a wrapper for bluge.Query.
+type Query struct {
+ query bluge.Query
+ node
+}
+
+// Searcher implements index.Query.
+func (q *Query) Searcher(i search.Reader, options search.SearcherOptions)
(search.Searcher, error) {
+ return q.query.Searcher(i, options)
+}
+
+func (q *Query) String() string {
+ return q.node.String()
+}
+
+// Query implements index.Query.
+func (q *Query) Query() bluge.Query {
+ return q.query
+}
+
+// BuildLocalQuery returns blugeQuery for local indices.
+func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema,
entityDict map[string]int,
+ entity []*modelv1.TagValue,
+) (*Query, [][]*modelv1.TagValue, bool, error) {
+ if criteria == nil {
+ return nil, [][]*modelv1.TagValue{entity}, false, nil
+ }
+ switch criteria.GetExp().(type) {
+ case *modelv1.Criteria_Condition:
+ cond := criteria.GetCondition()
+ expr, parsedEntity, err :=
logical.ParseExprOrEntity(entityDict, entity, cond)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ if parsedEntity != nil {
+ return nil, parsedEntity, false, nil
+ }
+ if ok, indexRule := schema.IndexDefined(cond.Name); ok {
+ return parseConditionToQuery(cond, indexRule, expr,
entity)
+ }
+ return nil, nil, false,
errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s",
cond)
+ case *modelv1.Criteria_Le:
+ le := criteria.GetLe()
+ if le.GetLeft() == nil && le.GetRight() == nil {
+ return nil, nil, false,
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and
right) of [%v] are empty", criteria)
+ }
+ if le.GetLeft() == nil {
+ return BuildLocalQuery(le.Right, schema, entityDict,
entity)
+ }
+ if le.GetRight() == nil {
+ return BuildLocalQuery(le.Left, schema, entityDict,
entity)
+ }
+ left, leftEntities, leftIsMatchAllQuery, err :=
BuildLocalQuery(le.Left, schema, entityDict, entity)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ right, rightEntities, rightIsMatchAllQuery, err :=
BuildLocalQuery(le.Right, schema, entityDict, entity)
+ if err != nil {
+ return nil, nil, false, err
+ }
+ entities := logical.ParseEntities(le.Op, entity, leftEntities,
rightEntities)
+ if entities == nil {
+ return nil, nil, false, nil
+ }
+ if left == nil && right == nil {
+ return nil, entities, false, nil
+ }
+ if leftIsMatchAllQuery && rightIsMatchAllQuery {
+ return &Query{
+ query: bluge.NewMatchAllQuery(),
+ node: newMatchAllNode(),
+ }, entities, true, nil
+ }
+ switch le.Op {
+ case modelv1.LogicalExpression_LOGICAL_OP_AND:
+ query, node := bluge.NewBooleanQuery(), newMustNode()
+ if left != nil {
+ query.AddMust(left.query)
+ node.Append(left.node)
+ }
+ if right != nil {
+ query.AddMust(right.query)
+ node.Append(right.node)
+ }
+ return &Query{query, node}, entities, false, nil
+ case modelv1.LogicalExpression_LOGICAL_OP_OR:
+ if leftIsMatchAllQuery || rightIsMatchAllQuery {
+ return &Query{
+ query: bluge.NewMatchAllQuery(),
+ node: newMatchAllNode(),
+ }, entities, true, nil
+ }
+ query, node := bluge.NewBooleanQuery(), newShouldNode()
+ query.SetMinShould(1)
+ if left != nil {
+ query.AddShould(left.query)
+ node.Append(left.node)
+ }
+ if right != nil {
+ query.AddShould(right.query)
+ node.Append(right.node)
+ }
+ return &Query{query, node}, entities, false, nil
+ }
+ }
+ return nil, nil, false, logical.ErrInvalidCriteriaType
+}
+
+func parseConditionToQuery(cond *modelv1.Condition, indexRule
*databasev1.IndexRule,
+ expr logical.LiteralExpr, entity []*modelv1.TagValue,
+) (*Query, [][]*modelv1.TagValue, bool, error) {
+ field := string(convert.Uint32ToBytes(indexRule.Metadata.Id))
+ b := expr.Bytes()
+ if len(b) < 1 {
+ return &Query{
+ query: bluge.NewMatchAllQuery(),
+ node: newMatchAllNode(),
+ }, [][]*modelv1.TagValue{entity}, true, nil
+ }
+ term, str := string(b[0]), expr.String()
+ switch cond.Op {
+ case modelv1.Condition_BINARY_OP_GT:
+ query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false,
false).SetField(field)
+ node := newTermRangeInclusiveNode(str, maxInf, false, false,
indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_GE:
+ query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true,
false).SetField(field)
+ node := newTermRangeInclusiveNode(str, maxInf, true, false,
indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_LT:
+ query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false,
false).SetField(field)
+ node := newTermRangeInclusiveNode(minInf, str, false, false,
indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_LE:
+ query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false,
true).SetField(field)
+ node := newTermRangeInclusiveNode(minInf, str, false, true,
indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_EQ:
+ query := bluge.NewTermQuery(term).SetField(field)
+ node := newTermNode(str, indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_MATCH:
+ query :=
bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer])
+ node := newMatchNode(str, indexRule)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_NE:
+ query, node := bluge.NewBooleanQuery(), newMustNotNode()
+ query.AddMustNot(bluge.NewTermQuery(term).SetField(field))
+ node.SetSubNode(newTermNode(str, indexRule))
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_HAVING:
+ bb, elements := expr.Bytes(), expr.Elements()
+ query, node := bluge.NewBooleanQuery(), newMustNode()
+ for _, b := range bb {
+
query.AddMust(bluge.NewTermQuery(string(b)).SetField(field))
+ }
+ for _, e := range elements {
+ node.Append(newTermNode(e, indexRule))
+ }
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_NOT_HAVING:
+ bb, elements := expr.Bytes(), expr.Elements()
+ subQuery, subNode := bluge.NewBooleanQuery(), newMustNode()
+ for _, b := range bb {
+
subQuery.AddMust(bluge.NewTermQuery(string(b)).SetField(field))
+ }
+ for _, e := range elements {
+ subNode.Append(newTermNode(e, indexRule))
+ }
+ query, node := bluge.NewBooleanQuery(), newMustNotNode()
+ query.AddMustNot(subQuery)
+ node.SetSubNode(node)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_IN:
+ bb, elements := expr.Bytes(), expr.Elements()
+ query, node := bluge.NewBooleanQuery(), newShouldNode()
+ query.SetMinShould(1)
+ for _, b := range bb {
+
query.AddShould(bluge.NewTermQuery(string(b)).SetField(field))
+ }
+ for _, e := range elements {
+ node.Append(newTermNode(e, indexRule))
+ }
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ case modelv1.Condition_BINARY_OP_NOT_IN:
+ bb, elements := expr.Bytes(), expr.Elements()
+ subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode()
+ subQuery.SetMinShould(1)
+ for _, b := range bb {
+
subQuery.AddShould(bluge.NewTermQuery(string(b)).SetField(field))
+ }
+ for _, e := range elements {
+ subNode.Append(newTermNode(e, indexRule))
+ }
+ query, node := bluge.NewBooleanQuery(), newMustNotNode()
+ query.AddMustNot(subQuery)
+ node.SetSubNode(subNode)
+ return &Query{query, node}, [][]*modelv1.TagValue{entity},
false, nil
+ }
+ return nil, nil, false,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses
%v", cond)
+}
+
+type node interface {
+ fmt.Stringer
+}
+
+type mustNode struct {
+ subNodes []node
+}
+
+func newMustNode() *mustNode {
+ return &mustNode{
+ subNodes: make([]node, 0),
+ }
+}
+
+func (m *mustNode) Append(subNode node) {
+ m.subNodes = append(m.subNodes, subNode)
+}
+
+func (m *mustNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["must"] = m.subNodes
+ return json.Marshal(data)
+}
+
+func (m *mustNode) String() string {
+ return convert.JSONToString(m)
+}
+
+type shouldNode struct {
+ subNodes []node
+}
+
+func newShouldNode() *shouldNode {
+ return &shouldNode{
+ subNodes: make([]node, 0),
+ }
+}
+
+func (s *shouldNode) Append(subNode node) {
+ s.subNodes = append(s.subNodes, subNode)
+}
+
+func (s *shouldNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["should"] = s.subNodes
+ return json.Marshal(data)
+}
+
+func (s *shouldNode) String() string {
+ return convert.JSONToString(s)
+}
+
+type mustNotNode struct {
+ subNode node
+}
+
+func newMustNotNode() *mustNotNode {
+ return &mustNotNode{}
+}
+
+func (m *mustNotNode) SetSubNode(subNode node) {
+ m.subNode = subNode
+}
+
+func (m *mustNotNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["mustNot"] = m.subNode
+ return json.Marshal(data)
+}
+
+func (m *mustNotNode) String() string {
+ return convert.JSONToString(m)
+}
+
+type matchAllNode struct{}
+
+func newMatchAllNode() *matchAllNode {
+ return &matchAllNode{}
+}
+
+func (m *matchAllNode) String() string {
+ return "matchAll"
+}
+
+type termRangeInclusiveNode struct {
+ indexRule *databasev1.IndexRule
+ min string
+ max string
+ minInclusive bool
+ maxInclusive bool
+}
+
+func newTermRangeInclusiveNode(min, max string, minInclusive, maxInclusive
bool, indexRule *databasev1.IndexRule) *termRangeInclusiveNode {
+ return &termRangeInclusiveNode{
+ indexRule: indexRule,
+ min: min,
+ max: max,
+ minInclusive: minInclusive,
+ maxInclusive: maxInclusive,
+ }
+}
+
+func (t *termRangeInclusiveNode) MarshalJSON() ([]byte, error) {
+ inner := make(map[string]interface{}, 1)
+ var builder strings.Builder
+ if t.minInclusive {
+ builder.WriteString("[")
+ } else {
+ builder.WriteString("(")
+ }
+ builder.WriteString(t.min + " ")
+ builder.WriteString(t.max)
+ if t.maxInclusive {
+ builder.WriteString("]")
+ } else {
+ builder.WriteString(")")
+ }
+ inner["range"] = builder.String()
+ inner["index"] = t.indexRule.Metadata.Name + ":" +
t.indexRule.Metadata.Group
+ data := make(map[string]interface{}, 1)
+ data["termRangeInclusive"] = inner
+ return json.Marshal(data)
+}
+
+func (t *termRangeInclusiveNode) String() string {
+ return convert.JSONToString(t)
+}
+
+type termNode struct {
+ indexRule *databasev1.IndexRule
+ term string
+}
+
+func newTermNode(term string, indexRule *databasev1.IndexRule) *termNode {
+ return &termNode{
+ indexRule: indexRule,
+ term: term,
+ }
+}
+
+func (t *termNode) MarshalJSON() ([]byte, error) {
+ inner := make(map[string]interface{}, 1)
+ inner["index"] = t.indexRule.Metadata.Name + ":" +
t.indexRule.Metadata.Group
+ inner["value"] = t.term
+ data := make(map[string]interface{}, 1)
+ data["term"] = inner
+ return json.Marshal(data)
+}
+
+func (t *termNode) String() string {
+ return convert.JSONToString(t)
+}
+
+type matchNode struct {
+ indexRule *databasev1.IndexRule
+ match string
+}
+
+func newMatchNode(match string, indexRule *databasev1.IndexRule) *matchNode {
+ return &matchNode{
+ indexRule: indexRule,
+ match: match,
+ }
+}
+
+func (m *matchNode) MarshalJSON() ([]byte, error) {
+ inner := make(map[string]interface{}, 1)
+ inner["index"] = m.indexRule.Metadata.Name + ":" +
m.indexRule.Metadata.Group
+ inner["value"] = m.match
+ inner["analyzer"] =
databasev1.IndexRule_Analyzer_name[int32(m.indexRule.Analyzer)]
+ data := make(map[string]interface{}, 1)
+ data["match"] = inner
+ return json.Marshal(data)
+}
+
+func (m *matchNode) String() string {
+ return convert.JSONToString(m)
+}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 88b77220..d06129b9 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -24,12 +24,17 @@ import (
)
var (
- errTagNotDefined = errors.New("tag is not defined")
- errUnsupportedConditionOp = errors.New("unsupported condition
operation")
- errUnsupportedConditionValue = errors.New("unsupported condition value
type")
- errInvalidCriteriaType = errors.New("invalid criteria type")
- errIndexNotDefined = errors.New("index is not define for the
tag")
- errIndexSortingUnsupported = errors.New("index does not support
sorting")
+ // ErrUnsupportedConditionOp indicates an unsupported condition
operation.
+ ErrUnsupportedConditionOp = errors.New("unsupported condition
operation")
+ // ErrUnsupportedConditionValue indicates an unsupported condition
value type.
+ ErrUnsupportedConditionValue = errors.New("unsupported condition value
type")
+ // ErrInvalidCriteriaType indicates an invalid criteria type.
+ ErrInvalidCriteriaType = errors.New("invalid criteria type")
+ // ErrInvalidLogicalExpression indicates an invalid logical expression.
+ ErrInvalidLogicalExpression = errors.New("invalid logical expression")
+ errTagNotDefined = errors.New("tag is not defined")
+ errIndexNotDefined = errors.New("index is not define for the
tag")
+ errIndexSortingUnsupported = errors.New("index does not support
sorting")
)
// Tag represents the combination of tag family and tag name.
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index ed0bdeda..fef36557 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -55,6 +55,11 @@ func (f *TagRef) String() string {
return fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(),
f.Spec.Spec.GetType().String())
}
+// Elements returns a slice containing the string representation of TagRef.
+func (f *TagRef) Elements() []string {
+ return []string{fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(),
f.Spec.Spec.GetType().String())}
+}
+
// NewTagRef returns a new TagRef.
func NewTagRef(familyName, tagName string) *TagRef {
return &TagRef{
@@ -85,6 +90,11 @@ func (f *FieldRef) String() string {
return fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(),
f.Spec.Spec.GetFieldType().String())
}
+// Elements returns a slice containing the string representation of FieldRef.
+func (f *FieldRef) Elements() []string {
+ return []string{fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(),
f.Spec.Spec.GetFieldType().String())}
+}
+
// DataType shows the type of the filed's value.
func (f *FieldRef) DataType() int32 {
if f.Spec == nil {
diff --git a/pkg/query/logical/expr_literal.go
b/pkg/query/logical/expr_literal.go
index 0a6b5fd9..c21fdd08 100644
--- a/pkg/query/logical/expr_literal.go
+++ b/pkg/query/logical/expr_literal.go
@@ -39,6 +39,12 @@ type int64Literal struct {
int64
}
+func newInt64Literal(val int64) *int64Literal {
+ return &int64Literal{
+ int64: val,
+ }
+}
+
func (i *int64Literal) Compare(other LiteralExpr) (int, bool) {
if o, ok := other.(*int64Literal); ok {
return int(i.int64 - o.int64), true
@@ -88,6 +94,10 @@ func (i *int64Literal) String() string {
return strconv.FormatInt(i.int64, 10)
}
+func (i *int64Literal) Elements() []string {
+ return []string{strconv.FormatInt(i.int64, 10)}
+}
+
var (
_ LiteralExpr = (*int64ArrLiteral)(nil)
_ ComparableExpr = (*int64ArrLiteral)(nil)
@@ -97,6 +107,12 @@ type int64ArrLiteral struct {
arr []int64
}
+func newInt64ArrLiteral(val []int64) *int64ArrLiteral {
+ return &int64ArrLiteral{
+ arr: val,
+ }
+}
+
func (i *int64ArrLiteral) Compare(other LiteralExpr) (int, bool) {
if o, ok := other.(*int64ArrLiteral); ok {
return 0, slices.Equal(i.arr, o.arr)
@@ -161,6 +177,14 @@ func (i *int64ArrLiteral) String() string {
return fmt.Sprintf("%v", i.arr)
}
+func (i *int64ArrLiteral) Elements() []string {
+ var elements []string
+ for _, v := range i.arr {
+ elements = append(elements, strconv.FormatInt(v, 10))
+ }
+ return elements
+}
+
var (
_ LiteralExpr = (*strLiteral)(nil)
_ ComparableExpr = (*strLiteral)(nil)
@@ -223,6 +247,10 @@ func (s *strLiteral) String() string {
return s.string
}
+func (s *strLiteral) Elements() []string {
+ return []string{s.string}
+}
+
var (
_ LiteralExpr = (*strArrLiteral)(nil)
_ ComparableExpr = (*strArrLiteral)(nil)
@@ -232,6 +260,12 @@ type strArrLiteral struct {
arr []string
}
+func newStrArrLiteral(val []string) *strArrLiteral {
+ return &strArrLiteral{
+ arr: val,
+ }
+}
+
func (s *strArrLiteral) Compare(other LiteralExpr) (int, bool) {
if o, ok := other.(*strArrLiteral); ok {
return 0, StringSlicesEqual(s.arr, o.arr)
@@ -296,34 +330,49 @@ func (s *strArrLiteral) String() string {
return fmt.Sprintf("%v", s.arr)
}
-type bytesLiteral struct {
+func (s *strArrLiteral) Elements() []string {
+ return s.arr
+}
+
+// BytesLiteral represents a wrapper for a slice of bytes.
+type BytesLiteral struct {
bb []byte
}
-func newBytesLiteral(bb []byte) *bytesLiteral {
- return &bytesLiteral{bb: bb}
+// NewBytesLiteral creates a new instance of BytesLiteral with the provided
slice of bytes.
+func NewBytesLiteral(bb []byte) *BytesLiteral {
+ return &BytesLiteral{bb: bb}
}
-func (b *bytesLiteral) Bytes() [][]byte {
+// Bytes returns a 2D slice of bytes where the inner slice contains the byte
slice stored in the BytesLiteral.
+func (b *BytesLiteral) Bytes() [][]byte {
return [][]byte{b.bb}
}
-func (b *bytesLiteral) Equal(expr Expr) bool {
- if other, ok := expr.(*bytesLiteral); ok {
+// Equal checks if the current BytesLiteral is equal to the provided Expr.
+func (b *BytesLiteral) Equal(expr Expr) bool {
+ if other, ok := expr.(*BytesLiteral); ok {
return bytes.Equal(other.bb, b.bb)
}
return false
}
-func (b *bytesLiteral) DataType() int32 {
+// DataType returns the data type of BytesLiteral.
+func (b *BytesLiteral) DataType() int32 {
return int32(databasev1.TagType_TAG_TYPE_DATA_BINARY)
}
-func (b *bytesLiteral) String() string {
+// String converts the BytesLiteral's slice of bytes to a string
representation.
+func (b *BytesLiteral) String() string {
return hex.EncodeToString(b.bb)
}
+// Elements returns a slice containing the string representation of the byte
slice.
+func (b *BytesLiteral) Elements() []string {
+ return []string{hex.EncodeToString(b.bb)}
+}
+
var (
_ LiteralExpr = (*nullLiteral)(nil)
_ ComparableExpr = (*nullLiteral)(nil)
@@ -332,6 +381,10 @@ var (
type nullLiteral struct{}
+func newNullLiteral() *nullLiteral {
+ return nullLiteralExpr
+}
+
func (s nullLiteral) Compare(_ LiteralExpr) (int, bool) {
return 0, false
}
@@ -359,3 +412,7 @@ func (s nullLiteral) DataType() int32 {
func (s nullLiteral) String() string {
return "null"
}
+
+func (s nullLiteral) Elements() []string {
+ return []string{"null"}
+}
diff --git a/pkg/query/logical/interface.go b/pkg/query/logical/interface.go
index dc002cad..0634473a 100644
--- a/pkg/query/logical/interface.go
+++ b/pkg/query/logical/interface.go
@@ -37,6 +37,7 @@ type Plan interface {
// Expr represents a predicate in criteria.
type Expr interface {
fmt.Stringer
+ Elements() []string
DataType() int32
Equal(Expr) bool
}
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 619d4971..a833023f 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -27,7 +27,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
@@ -87,7 +87,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema)
(logical.Plan, error)
// fill AnyEntry by default
entity[idx] = pbv1.AnyTagValue
}
- filter, entities, err := logical.BuildLocalFilter(uis.criteria, s,
entityMap, entity, true)
+ query, entities, _, err := inverted.BuildLocalQuery(uis.criteria, s,
entityMap, entity)
if err != nil {
return nil, err
}
@@ -100,7 +100,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema)
(logical.Plan, error)
projectionTagsRefs: projTagsRefs,
projectionFieldsRefs: projFieldRefs,
metadata: uis.metadata,
- filter: filter,
+ query: query,
entities: entities,
groupByEntity: uis.groupByEntity,
uis: uis,
@@ -114,7 +114,7 @@ var (
)
type localIndexScan struct {
- filter index.Filter
+ query *inverted.Query
schema logical.Schema
uis *unresolvedIndexScan
order *logical.OrderBy
@@ -155,7 +155,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit
executor.MIterator, e
Name: i.metadata.GetName(),
TimeRange: &i.timeRange,
Entities: i.entities,
- Filter: i.filter,
+ Query: i.query,
OrderByType: orderByType,
Order: orderBy,
TagProjection: i.projectionTags,
@@ -172,7 +172,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit
executor.MIterator, e
func (i *localIndexScan) String() string {
return fmt.Sprintf("IndexScan:
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s;
projection=%s; order=%s;",
i.timeRange.Start.Unix(), i.timeRange.End.Unix(),
i.metadata.GetGroup(), i.metadata.GetName(),
- i.filter, logical.FormatTagRefs(", ", i.projectionTagsRefs...),
i.order)
+ i.query, logical.FormatTagRefs(", ", i.projectionTagsRefs...),
i.order)
}
func (i *localIndexScan) Children() []logical.Plan {
diff --git a/pkg/query/logical/parser.go b/pkg/query/logical/parser.go
new file mode 100644
index 00000000..68f8ad47
--- /dev/null
+++ b/pkg/query/logical/parser.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "github.com/pkg/errors"
+
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// ParseExprOrEntity parses the condition and returns the literal expression
or the entities.
+func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue,
cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
+ entityIdx, ok := entityDict[cond.Name]
+ if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op !=
modelv1.Condition_BINARY_OP_IN {
+ return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp,
"tag belongs to the entity only supports EQ or IN operation in condition(%v)",
cond)
+ }
+ switch v := cond.Value.Value.(type) {
+ case *modelv1.TagValue_Str:
+ if ok {
+ parsedEntity := make([]*modelv1.TagValue, len(entity))
+ copy(parsedEntity, entity)
+ parsedEntity[entityIdx] = cond.Value
+ return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+ }
+ return str(v.Str.GetValue()), nil, nil
+ case *modelv1.TagValue_StrArray:
+ if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+ entities := make([][]*modelv1.TagValue,
len(v.StrArray.Value))
+ for i, va := range v.StrArray.Value {
+ parsedEntity := make([]*modelv1.TagValue,
len(entity))
+ copy(parsedEntity, entity)
+ parsedEntity[entityIdx] = &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: va,
+ },
+ },
+ }
+ entities[i] = parsedEntity
+ }
+ return nil, entities, nil
+ }
+ return newStrArrLiteral(v.StrArray.GetValue()), nil, nil
+ case *modelv1.TagValue_Int:
+ if ok {
+ parsedEntity := make([]*modelv1.TagValue, len(entity))
+ copy(parsedEntity, entity)
+ parsedEntity[entityIdx] = cond.Value
+ return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+ }
+ return newInt64Literal(v.Int.GetValue()), nil, nil
+ case *modelv1.TagValue_IntArray:
+ if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+ entities := make([][]*modelv1.TagValue,
len(v.IntArray.Value))
+ for i, va := range v.IntArray.Value {
+ parsedEntity := make([]*modelv1.TagValue,
len(entity))
+ copy(parsedEntity, entity)
+ parsedEntity[entityIdx] = &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: va,
+ },
+ },
+ }
+ entities[i] = parsedEntity
+ }
+ return nil, entities, nil
+ }
+ return newInt64ArrLiteral(v.IntArray.GetValue()), nil, nil
+ case *modelv1.TagValue_Null:
+ return newNullLiteral(), nil, nil
+ }
+ return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue,
"index filter parses %v", cond)
+}
+
+// ParseEntities merges entities based on the logical operation.
+func ParseEntities(op modelv1.LogicalExpression_LogicalOp, input
[]*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue {
+ count := len(input)
+ result := make([]*modelv1.TagValue, count)
+ anyEntity := func(entities [][]*modelv1.TagValue) bool {
+ for _, entity := range entities {
+ for _, entry := range entity {
+ if entry != pbv1.AnyTagValue {
+ return false
+ }
+ }
+ }
+ return true
+ }
+ leftAny := anyEntity(left)
+ rightAny := anyEntity(right)
+
+ mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right))
+
+ switch op {
+ case modelv1.LogicalExpression_LOGICAL_OP_AND:
+ if leftAny && !rightAny {
+ return right
+ }
+ if !leftAny && rightAny {
+ return left
+ }
+ mergedEntities = append(mergedEntities, left...)
+ mergedEntities = append(mergedEntities, right...)
+ for i := 0; i < count; i++ {
+ entry := pbv1.AnyTagValue
+ for j := 0; j < len(mergedEntities); j++ {
+ e := mergedEntities[j][i]
+ if e == pbv1.AnyTagValue {
+ continue
+ }
+ if entry == pbv1.AnyTagValue {
+ entry = e
+ } else if pbv1.MustCompareTagValue(entry, e) !=
0 {
+ return nil
+ }
+ }
+ result[i] = entry
+ }
+ case modelv1.LogicalExpression_LOGICAL_OP_OR:
+ if leftAny {
+ return left
+ }
+ if rightAny {
+ return right
+ }
+ mergedEntities = append(mergedEntities, left...)
+ mergedEntities = append(mergedEntities, right...)
+ for i := 0; i < count; i++ {
+ entry := pbv1.AnyTagValue
+ for j := 0; j < len(mergedEntities); j++ {
+ e := mergedEntities[j][i]
+ if entry == pbv1.AnyTagValue {
+ entry = e
+ } else if pbv1.MustCompareTagValue(entry, e) !=
0 {
+ return mergedEntities
+ }
+ }
+ result[i] = entry
+ }
+ }
+ return [][]*modelv1.TagValue{result}
+}
diff --git a/pkg/query/logical/index_filter.go
b/pkg/query/logical/stream/index_filter.go
similarity index 66%
rename from pkg/query/logical/index_filter.go
rename to pkg/query/logical/stream/index_filter.go
index 8450c94e..533000f1 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package logical
+package stream
import (
"bytes"
@@ -28,25 +28,15 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
-var errInvalidLogicalExpression = errors.New("invalid logical expression")
-
-// GlobalIndexError represents a index rule is "global".
-// The local filter can't handle it.
-type GlobalIndexError struct {
- IndexRule *databasev1.IndexRule
- Expr LiteralExpr
-}
-
-func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
-
-// BuildLocalFilter returns a new index.Filter for local indices.
-func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict
map[string]int,
- entity []*modelv1.TagValue, mandatoryIndexRule bool,
+// buildLocalFilter returns a new index.Filter for local indices.
+func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema,
+ entityDict map[string]int, entity []*modelv1.TagValue,
) (index.Filter, [][]*modelv1.TagValue, error) {
if criteria == nil {
return nil, [][]*modelv1.TagValue{entity}, nil
@@ -54,7 +44,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema
Schema, entityDict map[
switch criteria.GetExp().(type) {
case *modelv1.Criteria_Condition:
cond := criteria.GetCondition()
- expr, parsedEntity, err := parseExprOrEntity(entityDict,
entity, cond)
+ expr, parsedEntity, err :=
logical.ParseExprOrEntity(entityDict, entity, cond)
if err != nil {
return nil, nil, err
}
@@ -62,31 +52,29 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema
Schema, entityDict map[
return nil, parsedEntity, nil
}
if ok, indexRule := schema.IndexDefined(cond.Name); ok {
- return parseCondition(cond, indexRule, expr, entity)
- } else if mandatoryIndexRule {
- return nil, nil,
errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
+ return parseConditionToFilter(cond, indexRule, expr,
entity)
}
return ENode, [][]*modelv1.TagValue{entity}, nil
case *modelv1.Criteria_Le:
le := criteria.GetLe()
if le.GetLeft() == nil && le.GetRight() == nil {
- return nil, nil,
errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of
[%v] are empty", criteria)
+ return nil, nil,
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and
right) of [%v] are empty", criteria)
}
if le.GetLeft() == nil {
- return BuildLocalFilter(le.Right, schema, entityDict,
entity, mandatoryIndexRule)
+ return buildLocalFilter(le.Right, schema, entityDict,
entity)
}
if le.GetRight() == nil {
- return BuildLocalFilter(le.Left, schema, entityDict,
entity, mandatoryIndexRule)
+ return buildLocalFilter(le.Left, schema, entityDict,
entity)
}
- left, leftEntities, err := BuildLocalFilter(le.Left, schema,
entityDict, entity, mandatoryIndexRule)
+ left, leftEntities, err := buildLocalFilter(le.Left, schema,
entityDict, entity)
if err != nil {
return nil, nil, err
}
- right, rightEntities, err := BuildLocalFilter(le.Right, schema,
entityDict, entity, mandatoryIndexRule)
+ right, rightEntities, err := buildLocalFilter(le.Right, schema,
entityDict, entity)
if err != nil {
return nil, nil, err
}
- entities := parseEntities(le.Op, entity, leftEntities,
rightEntities)
+ entities := logical.ParseEntities(le.Op, entity, leftEntities,
rightEntities)
if entities == nil {
return nil, nil, nil
}
@@ -110,10 +98,12 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema
Schema, entityDict map[
return or, entities, nil
}
}
- return nil, nil, errInvalidCriteriaType
+ return nil, nil, logical.ErrInvalidCriteriaType
}
-func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule,
expr LiteralExpr, entity []*modelv1.TagValue) (index.Filter,
[][]*modelv1.TagValue, error) {
+func parseConditionToFilter(cond *modelv1.Condition, indexRule
*databasev1.IndexRule,
+ expr logical.LiteralExpr, entity []*modelv1.TagValue,
+) (index.Filter, [][]*modelv1.TagValue, error) {
switch cond.Op {
case modelv1.Condition_BINARY_OP_GT:
return newRange(indexRule, index.RangeOpts{
@@ -147,7 +137,7 @@ func parseCondition(cond *modelv1.Condition, indexRule
*databasev1.IndexRule, ex
}
and := newAnd(l)
for _, b := range bb {
- and.append(newEq(indexRule, newBytesLiteral(b)))
+ and.append(newEq(indexRule, logical.NewBytesLiteral(b)))
}
return and, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
@@ -158,7 +148,7 @@ func parseCondition(cond *modelv1.Condition, indexRule
*databasev1.IndexRule, ex
}
and := newAnd(l)
for _, b := range bb {
- and.append(newEq(indexRule, newBytesLiteral(b)))
+ and.append(newEq(indexRule, logical.NewBytesLiteral(b)))
}
return newNot(indexRule, and), [][]*modelv1.TagValue{entity},
nil
case modelv1.Condition_BINARY_OP_IN:
@@ -169,7 +159,7 @@ func parseCondition(cond *modelv1.Condition, indexRule
*databasev1.IndexRule, ex
}
or := newOr(l)
for _, b := range bb {
- or.append(newEq(indexRule, newBytesLiteral(b)))
+ or.append(newEq(indexRule, logical.NewBytesLiteral(b)))
}
return or, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
@@ -180,149 +170,11 @@ func parseCondition(cond *modelv1.Condition, indexRule
*databasev1.IndexRule, ex
}
or := newOr(l)
for _, b := range bb {
- or.append(newEq(indexRule, newBytesLiteral(b)))
+ or.append(newEq(indexRule, logical.NewBytesLiteral(b)))
}
return newNot(indexRule, or), [][]*modelv1.TagValue{entity}, nil
}
- return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index
filter parses %v", cond)
-}
-
-func parseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue,
cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
- entityIdx, ok := entityDict[cond.Name]
- if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op !=
modelv1.Condition_BINARY_OP_IN {
- return nil, nil, errors.WithMessagef(errUnsupportedConditionOp,
"tag belongs to the entity only supports EQ or IN operation in condition(%v)",
cond)
- }
- switch v := cond.Value.Value.(type) {
- case *modelv1.TagValue_Str:
- if ok {
- parsedEntity := make([]*modelv1.TagValue, len(entity))
- copy(parsedEntity, entity)
- parsedEntity[entityIdx] = cond.Value
- return nil, [][]*modelv1.TagValue{parsedEntity}, nil
- }
- return str(v.Str.GetValue()), nil, nil
- case *modelv1.TagValue_StrArray:
- if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
- entities := make([][]*modelv1.TagValue,
len(v.StrArray.Value))
- for i, va := range v.StrArray.Value {
- parsedEntity := make([]*modelv1.TagValue,
len(entity))
- copy(parsedEntity, entity)
- parsedEntity[entityIdx] = &modelv1.TagValue{
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value: va,
- },
- },
- }
- entities[i] = parsedEntity
- }
- return nil, entities, nil
- }
- return &strArrLiteral{
- arr: v.StrArray.GetValue(),
- }, nil, nil
- case *modelv1.TagValue_Int:
- if ok {
- parsedEntity := make([]*modelv1.TagValue, len(entity))
- copy(parsedEntity, entity)
- parsedEntity[entityIdx] = cond.Value
- return nil, [][]*modelv1.TagValue{parsedEntity}, nil
- }
- return &int64Literal{
- int64: v.Int.GetValue(),
- }, nil, nil
- case *modelv1.TagValue_IntArray:
- if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
- entities := make([][]*modelv1.TagValue,
len(v.IntArray.Value))
- for i, va := range v.IntArray.Value {
- parsedEntity := make([]*modelv1.TagValue,
len(entity))
- copy(parsedEntity, entity)
- parsedEntity[entityIdx] = &modelv1.TagValue{
- Value: &modelv1.TagValue_Int{
- Int: &modelv1.Int{
- Value: va,
- },
- },
- }
- entities[i] = parsedEntity
- }
- return nil, entities, nil
- }
- return &int64ArrLiteral{
- arr: v.IntArray.GetValue(),
- }, nil, nil
- case *modelv1.TagValue_Null:
- return nullLiteralExpr, nil, nil
- }
- return nil, nil, errors.WithMessagef(errUnsupportedConditionValue,
"index filter parses %v", cond)
-}
-
-func parseEntities(op modelv1.LogicalExpression_LogicalOp, input
[]*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue {
- count := len(input)
- result := make([]*modelv1.TagValue, count)
- anyEntity := func(entities [][]*modelv1.TagValue) bool {
- for _, entity := range entities {
- for _, entry := range entity {
- if entry != pbv1.AnyTagValue {
- return false
- }
- }
- }
- return true
- }
- leftAny := anyEntity(left)
- rightAny := anyEntity(right)
-
- mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right))
-
- switch op {
- case modelv1.LogicalExpression_LOGICAL_OP_AND:
- if leftAny && !rightAny {
- return right
- }
- if !leftAny && rightAny {
- return left
- }
- mergedEntities = append(mergedEntities, left...)
- mergedEntities = append(mergedEntities, right...)
- for i := 0; i < count; i++ {
- entry := pbv1.AnyTagValue
- for j := 0; j < len(mergedEntities); j++ {
- e := mergedEntities[j][i]
- if e == pbv1.AnyTagValue {
- continue
- }
- if entry == pbv1.AnyTagValue {
- entry = e
- } else if pbv1.MustCompareTagValue(entry, e) !=
0 {
- return nil
- }
- }
- result[i] = entry
- }
- case modelv1.LogicalExpression_LOGICAL_OP_OR:
- if leftAny {
- return left
- }
- if rightAny {
- return right
- }
- mergedEntities = append(mergedEntities, left...)
- mergedEntities = append(mergedEntities, right...)
- for i := 0; i < count; i++ {
- entry := pbv1.AnyTagValue
- for j := 0; j < len(mergedEntities); j++ {
- e := mergedEntities[j][i]
- if entry == pbv1.AnyTagValue {
- entry = e
- } else if pbv1.MustCompareTagValue(entry, e) !=
0 {
- return mergedEntities
- }
- }
- result[i] = entry
- }
- }
- return [][]*modelv1.TagValue{result}
+ return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp,
"index filter parses %v", cond)
}
type fieldKey struct {
@@ -420,7 +272,7 @@ func (an *andNode) MarshalJSON() ([]byte, error) {
}
func (an *andNode) String() string {
- return jsonToString(an)
+ return convert.JSONToString(an)
}
type orNode struct {
@@ -465,13 +317,13 @@ func (on *orNode) MarshalJSON() ([]byte, error) {
}
func (on *orNode) String() string {
- return jsonToString(on)
+ return convert.JSONToString(on)
}
type leaf struct {
index.Filter
Key fieldKey
- Expr LiteralExpr
+ Expr logical.LiteralExpr
}
func (l *leaf) MarshalJSON() ([]byte, error) {
@@ -518,14 +370,14 @@ func (n *not) MarshalJSON() ([]byte, error) {
}
func (n *not) String() string {
- return jsonToString(n)
+ return convert.JSONToString(n)
}
type eq struct {
*leaf
}
-func newEq(indexRule *databasev1.IndexRule, values LiteralExpr) *eq {
+func newEq(indexRule *databasev1.IndexRule, values logical.LiteralExpr) *eq {
return &eq{
leaf: &leaf{
Key: newFieldKey(indexRule),
@@ -552,14 +404,14 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
}
func (eq *eq) String() string {
- return jsonToString(eq)
+ return convert.JSONToString(eq)
}
type match struct {
*leaf
}
-func newMatch(indexRule *databasev1.IndexRule, values LiteralExpr) *match {
+func newMatch(indexRule *databasev1.IndexRule, values logical.LiteralExpr)
*match {
return &match{
leaf: &leaf{
Key: newFieldKey(indexRule),
@@ -591,7 +443,7 @@ func (match *match) MarshalJSON() ([]byte, error) {
}
func (match *match) String() string {
- return jsonToString(match)
+ return convert.JSONToString(match)
}
type rangeOp struct {
@@ -642,15 +494,7 @@ func (r *rangeOp) MarshalJSON() ([]byte, error) {
}
func (r *rangeOp) String() string {
- return jsonToString(r)
-}
-
-func jsonToString(marshaler json.Marshaler) string {
- bb, err := marshaler.MarshalJSON()
- if err != nil {
- return err.Error()
- }
- return string(bb)
+ return convert.JSONToString(r)
}
var (
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index a861dcfa..e7acf94e 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -55,7 +55,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema)
(logical.Plan, error)
entity[idx] = pbv1.AnyTagValue
}
var err error
- ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria,
s, entityDict, entity, false)
+ ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s,
entityDict, entity)
if err != nil {
return nil, err
}
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index 0936e110..445ddfcc 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
)
var errUnsupportedLogicalOperation = errors.New("unsupported logical
operation")
@@ -122,7 +123,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict
map[string]int, index
return or, nil
}
}
- return nil, errInvalidCriteriaType
+ return nil, ErrInvalidCriteriaType
}
func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter,
error) {
@@ -158,7 +159,7 @@ func parseFilter(cond *modelv1.Condition, expr
ComparableExpr) (TagFilter, error
case modelv1.Condition_BINARY_OP_NOT_IN:
return newNotTag(newInTag(cond.Name, expr)), nil
default:
- return nil, errors.WithMessagef(errUnsupportedConditionOp, "tag
filter parses %v", cond)
+ return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag
filter parses %v", cond)
}
}
@@ -181,7 +182,7 @@ func parseExpr(value *modelv1.TagValue) (ComparableExpr,
error) {
case *modelv1.TagValue_Null:
return nullLiteralExpr, nil
}
- return nil, errors.WithMessagef(errUnsupportedConditionValue, "tag
filter parses %v", value)
+ return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag
filter parses %v", value)
}
// DummyFilter matches any predicate.
@@ -261,7 +262,7 @@ func (an *andLogicalNode) MarshalJSON() ([]byte, error) {
}
func (an *andLogicalNode) String() string {
- return jsonToString(an)
+ return convert.JSONToString(an)
}
type orLogicalNode struct {
@@ -296,7 +297,7 @@ func (on *orLogicalNode) MarshalJSON() ([]byte, error) {
}
func (on *orLogicalNode) String() string {
- return jsonToString(on)
+ return convert.JSONToString(on)
}
type tagLeaf struct {
@@ -338,7 +339,7 @@ func (n *notTag) MarshalJSON() ([]byte, error) {
}
func (n *notTag) String() string {
- return jsonToString(n)
+ return convert.JSONToString(n)
}
type inTag struct {
@@ -390,7 +391,7 @@ func (eq *eqTag) MarshalJSON() ([]byte, error) {
}
func (eq *eqTag) String() string {
- return jsonToString(eq)
+ return convert.JSONToString(eq)
}
type rangeOpts struct {
@@ -480,7 +481,7 @@ func (r *rangeTag) MarshalJSON() ([]byte, error) {
}
func (r *rangeTag) String() string {
- return jsonToString(r)
+ return convert.JSONToString(r)
}
func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName
string) (ComparableExpr, error) {
@@ -520,5 +521,5 @@ func (h *havingTag) MarshalJSON() ([]byte, error) {
}
func (h *havingTag) String() string {
- return jsonToString(h)
+ return convert.JSONToString(h)
}
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 539508e4..20e98659 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -25,6 +25,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -72,7 +73,7 @@ const (
// MeasureQueryOptions is the options of a measure query.
type MeasureQueryOptions struct {
- Filter index.Filter
+ Query *inverted.Query
TimeRange *timestamp.TimeRange
Order *OrderBy
Name string