This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch unify-index-filters
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 691d567d28a2e41334c79ac3c5e03e1501d3c653
Author: Megrez Lu <[email protected]>
AuthorDate: Tue Jan 10 16:59:38 2023 +0800

    unify filter
---
 banyand/measure/measure_topn.go | 86 +++++------------------------------------
 pkg/pb/v1/write.go              | 19 +++++++++
 pkg/query/logical/schema.go     | 19 ++++++++-
 pkg/query/logical/tag_filter.go | 13 +++++--
 4 files changed, 55 insertions(+), 82 deletions(-)

diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 4d51099..4dd50f3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -43,6 +43,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 const (
@@ -56,8 +57,6 @@ var (
        _ io.Closer = (*topNProcessorManager)(nil)
        _ flow.Sink = (*topNStreamingProcessor)(nil)
 
-       errUnsupportedConditionValueType = errors.New("unsupported value type 
in the condition")
-
        // TopNValueFieldSpec denotes the field specification of the topN 
calculated result.
        TopNValueFieldSpec = &databasev1.FieldSpec{
                Name:              "value",
@@ -393,88 +392,23 @@ func (manager *topNProcessorManager) buildFilter(criteria 
*modelv1.Criteria) (fl
                }, nil
        }
 
-       f, err := manager.buildFilterForCriteria(criteria)
+       f, err := logical.BuildSimpleTagFilter(criteria)
        if err != nil {
                return nil, err
        }
 
        return func(_ context.Context, dataPoint any) bool {
-               tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
-               return f.predicate(tfs)
+               tffw := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+               tfs := pbv1.AttachSchema(tffw, manager.m.schema)
+               ok, matchErr := f.Match(tfs)
+               if matchErr != nil {
+                       manager.l.Err(matchErr).Msg("fail to match criteria")
+                       return false
+               }
+               return ok
        }, nil
 }
 
-func (manager *topNProcessorManager) buildFilterForCriteria(criteria 
*modelv1.Criteria) (conditionFilter, error) {
-       switch v := criteria.GetExp().(type) {
-       case *modelv1.Criteria_Condition:
-               return manager.buildFilterForCondition(v.Condition)
-       case *modelv1.Criteria_Le:
-               return manager.buildFilterForLogicalExpr(v.Le)
-       default:
-               return nil, errors.New("should not reach here")
-       }
-}
-
-// buildFilterForCondition builds a logical and composable filter for a 
logical expression which have underlying conditions,
-// or nested logical expressions as its children.
-func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr 
*modelv1.LogicalExpression) (conditionFilter, error) {
-       left, lErr := manager.buildFilterForCriteria(logicalExpr.Left)
-       if lErr != nil {
-               return nil, lErr
-       }
-       right, rErr := manager.buildFilterForCriteria(logicalExpr.Right)
-       if rErr != nil {
-               return nil, rErr
-       }
-       return composeWithOp(left, right, logicalExpr.Op), nil
-}
-
-func composeWithOp(left, right conditionFilter, op 
modelv1.LogicalExpression_LogicalOp) conditionFilter {
-       if op == modelv1.LogicalExpression_LOGICAL_OP_AND {
-               return &andFilter{left, right}
-       }
-       return &orFilter{left, right}
-}
-
-// buildFilterForCondition builds a single, composable filter for a single 
condition.
-func (manager *topNProcessorManager) buildFilterForCondition(cond 
*modelv1.Condition) (conditionFilter, error) {
-       familyOffset, tagOffset, spec := 
pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName())
-       if spec == nil {
-               return nil, errors.New("fail to parse tag by name")
-       }
-       switch v := cond.GetValue().GetValue().(type) {
-       case *modelv1.TagValue_Int:
-               return &int64TagFilter{
-                       TagLocator: partition.TagLocator{
-                               FamilyOffset: familyOffset,
-                               TagOffset:    tagOffset,
-                       },
-                       op:  cond.GetOp(),
-                       val: v.Int.GetValue(),
-               }, nil
-       case *modelv1.TagValue_Str:
-               return &strTagFilter{
-                       TagLocator: partition.TagLocator{
-                               FamilyOffset: familyOffset,
-                               TagOffset:    tagOffset,
-                       },
-                       op:  cond.GetOp(),
-                       val: v.Str.GetValue(),
-               }, nil
-       case *modelv1.TagValue_Id:
-               return &idTagFilter{
-                       TagLocator: partition.TagLocator{
-                               FamilyOffset: familyOffset,
-                               TagOffset:    tagOffset,
-                       },
-                       op:  cond.GetOp(),
-                       val: v.Id.GetValue(),
-               }, nil
-       default:
-               return nil, errUnsupportedConditionValueType
-       }
-}
-
 func (manager *topNProcessorManager) buildMapper(fieldName string, 
groupByNames ...string) (flow.UnaryFunc[any], error) {
        fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), 
func(spec *databasev1.FieldSpec) bool {
                return spec.GetName() == fieldName
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 03c470b..a8147c0 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -217,3 +217,22 @@ func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, 
time.Duration, error) {
                CompressionMethod: databasev1.CompressionMethod(int32(b[0] & 
0x0F)),
        }, time.Duration(convert.BytesToInt64(b[1:])), nil
 }
+
+func AttachSchema(tffws []*modelv1.TagFamilyForWrite, measure 
*databasev1.Measure) []*modelv1.TagFamily {
+       tfs := make([]*modelv1.TagFamily, len(tffws))
+       for tagFamilyIdx := 0; tagFamilyIdx < len(tffws); tagFamilyIdx++ {
+               tffw := tffws[tagFamilyIdx]
+               tags := make([]*modelv1.Tag, len(tffw.GetTags()))
+               for tagIndex := 0; tagIndex < len(tffw.GetTags()); tagIndex++ {
+                       tags[tagIndex] = &modelv1.Tag{
+                               Key:   
measure.TagFamilies[tagFamilyIdx].Tags[tagIndex].Name,
+                               Value: tffw.GetTags()[tagIndex],
+                       }
+               }
+               tfs[tagFamilyIdx] = &modelv1.TagFamily{
+                       Name: measure.TagFamilies[tagFamilyIdx].Name,
+                       Tags: tags,
+               }
+       }
+       return tfs
+}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 4512b9d..ac320a9 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -24,12 +24,27 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
 )
 
+// IndexChecker allows checking the existence of a specific index rule
+type IndexChecker interface {
+       IndexDefined(tagName string) (bool, *databasev1.IndexRule)
+       IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule)
+}
+
+type emptyIndexChecker struct{}
+
+func (emptyIndexChecker) IndexDefined(tagName string) (bool, 
*databasev1.IndexRule) {
+       return false, nil
+}
+
+func (emptyIndexChecker) IndexRuleDefined(ruleName string) (bool, 
*databasev1.IndexRule) {
+       return false, nil
+}
+
 // Schema allows retrieving schemas in a convenient way.
 type Schema interface {
+       IndexChecker
        Scope() tsdb.Entry
        EntityList() []string
-       IndexDefined(tagName string) (bool, *databasev1.IndexRule)
-       IndexRuleDefined(string) (bool, *databasev1.IndexRule)
        CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
        CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
        ProjTags(refs ...[]*TagRef) Schema
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cafd665..03b6979 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,8 +35,13 @@ type TagFilter interface {
        Match(tagFamilies []*modelv1.TagFamily) (bool, error)
 }
 
+// BuildSimpleTagFilter returns a TagFilter without any local-index, global 
index, sharding key support.
+func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
+       return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
+}
+
 // BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
        if criteria == nil {
                return DummyFilter, nil
        }
@@ -47,7 +52,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, schem
                if err != nil {
                        return nil, err
                }
-               if ok, _ := schema.IndexDefined(cond.Name); ok {
+               if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
                        return DummyFilter, nil
                }
                if _, ok := entityDict[cond.Name]; ok {
@@ -56,11 +61,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, schem
                return parseFilter(cond, expr)
        case *modelv1.Criteria_Le:
                le := criteria.GetLe()
-               left, err := BuildTagFilter(le.Left, entityDict, schema, 
hasGlobalIndex)
+               left, err := BuildTagFilter(le.Left, entityDict, indexChecker, 
hasGlobalIndex)
                if err != nil {
                        return nil, err
                }
-               right, err := BuildTagFilter(le.Right, entityDict, schema, 
hasGlobalIndex)
+               right, err := BuildTagFilter(le.Right, entityDict, 
indexChecker, hasGlobalIndex)
                if err != nil {
                        return nil, err
                }

Reply via email to