This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch unify-index-filters in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 691d567d28a2e41334c79ac3c5e03e1501d3c653 Author: Megrez Lu <[email protected]> AuthorDate: Tue Jan 10 16:59:38 2023 +0800 unify filter --- banyand/measure/measure_topn.go | 86 +++++------------------------------------ pkg/pb/v1/write.go | 19 +++++++++ pkg/query/logical/schema.go | 19 ++++++++- pkg/query/logical/tag_filter.go | 13 +++++-- 4 files changed, 55 insertions(+), 82 deletions(-) diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go index 4d51099..4dd50f3 100644 --- a/banyand/measure/measure_topn.go +++ b/banyand/measure/measure_topn.go @@ -43,6 +43,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) const ( @@ -56,8 +57,6 @@ var ( _ io.Closer = (*topNProcessorManager)(nil) _ flow.Sink = (*topNStreamingProcessor)(nil) - errUnsupportedConditionValueType = errors.New("unsupported value type in the condition") - // TopNValueFieldSpec denotes the field specification of the topN calculated result. TopNValueFieldSpec = &databasev1.FieldSpec{ Name: "value", @@ -393,88 +392,23 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl }, nil } - f, err := manager.buildFilterForCriteria(criteria) + f, err := logical.BuildSimpleTagFilter(criteria) if err != nil { return nil, err } return func(_ context.Context, dataPoint any) bool { - tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies() - return f.predicate(tfs) + tffw := dataPoint.(*measurev1.DataPointValue).GetTagFamilies() + tfs := pbv1.AttachSchema(tffw, manager.m.schema) + ok, matchErr := f.Match(tfs) + if matchErr != nil { + manager.l.Err(matchErr).Msg("fail to match criteria") + return false + } + return ok }, nil } -func (manager *topNProcessorManager) buildFilterForCriteria(criteria *modelv1.Criteria) (conditionFilter, error) { - switch v := criteria.GetExp().(type) { - case *modelv1.Criteria_Condition: - return manager.buildFilterForCondition(v.Condition) - case *modelv1.Criteria_Le: - return manager.buildFilterForLogicalExpr(v.Le) - default: - return nil, errors.New("should not reach here") - } -} - -// buildFilterForCondition builds a logical and composable filter for a logical expression which have underlying conditions, -// or nested logical expressions as its children. -func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr *modelv1.LogicalExpression) (conditionFilter, error) { - left, lErr := manager.buildFilterForCriteria(logicalExpr.Left) - if lErr != nil { - return nil, lErr - } - right, rErr := manager.buildFilterForCriteria(logicalExpr.Right) - if rErr != nil { - return nil, rErr - } - return composeWithOp(left, right, logicalExpr.Op), nil -} - -func composeWithOp(left, right conditionFilter, op modelv1.LogicalExpression_LogicalOp) conditionFilter { - if op == modelv1.LogicalExpression_LOGICAL_OP_AND { - return &andFilter{left, right} - } - return &orFilter{left, right} -} - -// buildFilterForCondition builds a single, composable filter for a single condition. -func (manager *topNProcessorManager) buildFilterForCondition(cond *modelv1.Condition) (conditionFilter, error) { - familyOffset, tagOffset, spec := pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName()) - if spec == nil { - return nil, errors.New("fail to parse tag by name") - } - switch v := cond.GetValue().GetValue().(type) { - case *modelv1.TagValue_Int: - return &int64TagFilter{ - TagLocator: partition.TagLocator{ - FamilyOffset: familyOffset, - TagOffset: tagOffset, - }, - op: cond.GetOp(), - val: v.Int.GetValue(), - }, nil - case *modelv1.TagValue_Str: - return &strTagFilter{ - TagLocator: partition.TagLocator{ - FamilyOffset: familyOffset, - TagOffset: tagOffset, - }, - op: cond.GetOp(), - val: v.Str.GetValue(), - }, nil - case *modelv1.TagValue_Id: - return &idTagFilter{ - TagLocator: partition.TagLocator{ - FamilyOffset: familyOffset, - TagOffset: tagOffset, - }, - op: cond.GetOp(), - val: v.Id.GetValue(), - }, nil - default: - return nil, errUnsupportedConditionValueType - } -} - func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) { fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool { return spec.GetName() == fieldName diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go index 03c470b..a8147c0 100644 --- a/pkg/pb/v1/write.go +++ b/pkg/pb/v1/write.go @@ -217,3 +217,22 @@ func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) { CompressionMethod: databasev1.CompressionMethod(int32(b[0] & 0x0F)), }, time.Duration(convert.BytesToInt64(b[1:])), nil } + +func AttachSchema(tffws []*modelv1.TagFamilyForWrite, measure *databasev1.Measure) []*modelv1.TagFamily { + tfs := make([]*modelv1.TagFamily, len(tffws)) + for tagFamilyIdx := 0; tagFamilyIdx < len(tffws); tagFamilyIdx++ { + tffw := tffws[tagFamilyIdx] + tags := make([]*modelv1.Tag, len(tffw.GetTags())) + for tagIndex := 0; tagIndex < len(tffw.GetTags()); tagIndex++ { + tags[tagIndex] = &modelv1.Tag{ + Key: measure.TagFamilies[tagFamilyIdx].Tags[tagIndex].Name, + Value: tffw.GetTags()[tagIndex], + } + } + tfs[tagFamilyIdx] = &modelv1.TagFamily{ + Name: measure.TagFamilies[tagFamilyIdx].Name, + Tags: tags, + } + } + return tfs +} diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go index 4512b9d..ac320a9 100644 --- a/pkg/query/logical/schema.go +++ b/pkg/query/logical/schema.go @@ -24,12 +24,27 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb" ) +// IndexChecker allows checking the existence of a specific index rule +type IndexChecker interface { + IndexDefined(tagName string) (bool, *databasev1.IndexRule) + IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule) +} + +type emptyIndexChecker struct{} + +func (emptyIndexChecker) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { + return false, nil +} + +func (emptyIndexChecker) IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule) { + return false, nil +} + // Schema allows retrieving schemas in a convenient way. type Schema interface { + IndexChecker Scope() tsdb.Entry EntityList() []string - IndexDefined(tagName string) (bool, *databasev1.IndexRule) - IndexRuleDefined(string) (bool, *databasev1.IndexRule) CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error) CreateFieldRef(fields ...*Field) ([]*FieldRef, error) ProjTags(refs ...[]*TagRef) Schema diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index cafd665..03b6979 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -35,8 +35,13 @@ type TagFilter interface { Match(tagFamilies []*modelv1.TagFamily) (bool, error) } +// BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support. +func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) { + return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false) +} + // BuildTagFilter returns a TagFilter if predicates doesn't match any indices. -func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) { +func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) { if criteria == nil { return DummyFilter, nil } @@ -47,7 +52,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem if err != nil { return nil, err } - if ok, _ := schema.IndexDefined(cond.Name); ok { + if ok, _ := indexChecker.IndexDefined(cond.Name); ok { return DummyFilter, nil } if _, ok := entityDict[cond.Name]; ok { @@ -56,11 +61,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem return parseFilter(cond, expr) case *modelv1.Criteria_Le: le := criteria.GetLe() - left, err := BuildTagFilter(le.Left, entityDict, schema, hasGlobalIndex) + left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex) if err != nil { return nil, err } - right, err := BuildTagFilter(le.Right, entityDict, schema, hasGlobalIndex) + right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex) if err != nil { return nil, err }
