This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a73d911ea6ca957ddb7a4f93ea44a1a052be10ff
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 7 06:31:57 2022 +0000

    Access the metadata cache in the querying procedure.
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/query/processor.go                    | 23 ++++----------
 banyand/stream/stream.go                      |  4 +++
 banyand/stream/stream_query.go                |  2 ++
 banyand/tsdb/seriesdb.go                      |  1 -
 pkg/query/logical/measure/measure_analyzer.go | 44 ++++++---------------------
 pkg/query/logical/measure/schema.go           |  4 ---
 pkg/query/logical/schema.go                   |  8 -----
 pkg/query/logical/stream/schema.go            |  4 ---
 pkg/query/logical/stream/stream_analyzer.go   | 41 +++++--------------------
 9 files changed, 30 insertions(+), 101 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 6171b1a..20b7bd6 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -52,7 +52,8 @@ var (
 )
 
 type queryService struct {
-       log         *logger.Logger
+       log *logger.Logger
+       // TODO: remove the metaService once 
https://github.com/apache/skywalking/issues/10121 is fixed.
        metaService metadata.Service
        serviceRepo discovery.ServiceRepo
        pipeline    queue.Queue
@@ -85,19 +86,13 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                return
        }
 
-       analyzer, err := 
logical_stream.CreateAnalyzerFromMetaService(p.metaService)
-       if err != nil {
-               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build analyzer for stream %s: %v", meta.GetName(), err))
-               return
-       }
-
-       s, err := analyzer.BuildSchema(context.TODO(), meta)
+       s, err := logical_stream.BuildSchema(ec)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for stream %s: %v", meta.GetName(), err))
                return
        }
 
-       plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
+       plan, err := logical_stream.Analyze(context.TODO(), queryCriteria, 
meta, s)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for stream %s: %v", meta.GetName(), err))
                return
@@ -143,19 +138,13 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                return
        }
 
-       analyzer, err := 
logical_measure.CreateAnalyzerFromMetaService(p.metaService)
-       if err != nil {
-               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build analyzer for measure %s: %v", meta.GetName(), err))
-               return
-       }
-
-       s, err := analyzer.BuildSchema(context.TODO(), meta)
+       s, err := logical_measure.BuildSchema(ec)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for measure %s: %v", meta.GetName(), err))
                return
        }
 
-       plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
+       plan, err := logical_measure.Analyze(context.TODO(), queryCriteria, 
meta, s)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", meta.GetName(), err))
                return
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index c8866cf..711b713 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -54,6 +54,10 @@ func (s *stream) GetMetadata() *commonv1.Metadata {
        return s.schema.Metadata
 }
 
+func (s *stream) GetSchema() *databasev1.Stream {
+       return s.schema
+}
+
 func (s *stream) GetIndexRules() []*databasev1.IndexRule {
        return s.indexRules
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 5b3ef7c..b2af26d 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -45,6 +45,8 @@ type Stream interface {
        Shard(id common.ShardID) (tsdb.Shard, error)
        ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, 
error)
        ParseElementID(item tsdb.Item) (string, error)
+       GetSchema() *databasev1.Stream
+       GetIndexRules() []*databasev1.IndexRule
 }
 
 var _ Stream = (*stream)(nil)
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 5f759e4..5b69639 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -329,7 +329,6 @@ func (s *seriesDB) Get(key []byte, entityValues 
EntityValues) (Series, error) {
 
                var series string
                if e := s.l.Debug(); e.Enabled() {
-                       // TODO: store following info when the debug is enabled
                        errDecode = 
s.seriesMetadata.Put(prepend(seriesID.Marshal(), seriesPrefix), 
entityValuesBytes)
                        if errDecode != nil {
                                return nil, errDecode
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index 412362f..e516c23 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -22,57 +22,33 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
-// Analyzer analyzes the measure querying expression to the execution plan.
-type Analyzer struct {
-       metadataRepoImpl metadata.Repo
-}
-
-// CreateAnalyzerFromMetaService returns a Analyzer.
-func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer, 
error) {
-       return &Analyzer{
-               metaSvc,
-       }, nil
-}
-
 // BuildSchema returns Schema loaded from the metadata repository.
-func (a *Analyzer) BuildSchema(ctx context.Context, metadata 
*commonv1.Metadata) (logical.Schema, error) {
-       group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx, 
metadata.GetGroup())
-       if err != nil {
-               return nil, err
-       }
-       measure, err := a.metadataRepoImpl.MeasureRegistry().GetMeasure(ctx, 
metadata)
-       if err != nil {
-               return nil, err
-       }
-
-       indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata)
-       if err != nil {
-               return nil, err
-       }
+func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) {
+       md := measureSchema.GetSchema()
+       md.GetEntity()
 
        ms := &schema{
                common: &logical.CommonSchema{
-                       Group:      group,
-                       IndexRules: indexRules,
+                       IndexRules: measureSchema.GetIndexRules(),
                        TagMap:     make(map[string]*logical.TagSpec),
-                       EntityList: measure.GetEntity().GetTagNames(),
+                       EntityList: md.GetEntity().GetTagNames(),
                },
-               measure:  measure,
+               measure:  md,
                fieldMap: make(map[string]*logical.FieldSpec),
        }
 
-       for tagFamilyIdx, tagFamily := range measure.GetTagFamilies() {
+       for tagFamilyIdx, tagFamily := range md.GetTagFamilies() {
                for tagIdx, spec := range tagFamily.GetTags() {
                        ms.registerTag(tagFamilyIdx, tagIdx, spec)
                }
        }
 
-       for fieldIdx, spec := range measure.GetFields() {
+       for fieldIdx, spec := range md.GetFields() {
                ms.registerField(fieldIdx, spec)
        }
 
@@ -80,7 +56,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata 
*commonv1.Metadata)
 }
 
 // Analyze converts logical expressions to executable operation tree 
represented by Plan.
-func (a *Analyzer) Analyze(_ context.Context, criteria 
*measurev1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) 
(logical.Plan, error) {
+func Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata 
*commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
        groupByEntity := false
        var groupByTags [][]*logical.Tag
        if criteria.GetGroupBy() != nil {
diff --git a/pkg/query/logical/measure/schema.go 
b/pkg/query/logical/measure/schema.go
index 7161a09..8a337aa 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -104,10 +104,6 @@ func (m *schema) Equal(s2 logical.Schema) bool {
        return false
 }
 
-func (m *schema) ShardNumber() uint32 {
-       return m.common.ShardNumber()
-}
-
 // registerTag registers the tag spec with given tagFamilyIdx and tagIdx.
 func (m *schema) registerTag(tagFamilyIdx, tagIdx int, spec 
*databasev1.TagSpec) {
        m.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index fedac76..4512b9d 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -20,7 +20,6 @@ package logical
 import (
        "github.com/pkg/errors"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
 )
@@ -36,7 +35,6 @@ type Schema interface {
        ProjTags(refs ...[]*TagRef) Schema
        ProjFields(refs ...*FieldRef) Schema
        Equal(Schema) bool
-       ShardNumber() uint32
 }
 
 // TagSpec wraps offsets to access a tag in the raw data swiftly.
@@ -55,7 +53,6 @@ func (fs *TagSpec) Equal(other *TagSpec) bool {
 // CommonSchema represents a sharable fields between independent schemas.
 // It provides common access methods at the same time.
 type CommonSchema struct {
-       Group      *commonv1.Group
        IndexRules []*databasev1.IndexRule
        TagMap     map[string]*TagSpec
        EntityList []string
@@ -92,11 +89,6 @@ func (cs *CommonSchema) RegisterTag(tagFamilyIdx, tagIdx 
int, spec *databasev1.T
        }
 }
 
-// ShardNumber returns the number of shards defined by the schema.
-func (cs *CommonSchema) ShardNumber() uint32 {
-       return cs.Group.ResourceOpts.ShardNum
-}
-
 // IndexDefined checks whether the field given is indexed.
 func (cs *CommonSchema) IndexDefined(tagName string) (bool, 
*databasev1.IndexRule) {
        for _, idxRule := range cs.IndexRules {
diff --git a/pkg/query/logical/stream/schema.go 
b/pkg/query/logical/stream/schema.go
index 3961fb3..fc7bf59 100644
--- a/pkg/query/logical/stream/schema.go
+++ b/pkg/query/logical/stream/schema.go
@@ -86,10 +86,6 @@ func (s *schema) ProjFields(...*logical.FieldRef) 
logical.Schema {
        panic("stream does not support field")
 }
 
-func (s *schema) ShardNumber() uint32 {
-       return s.common.ShardNumber()
-}
-
 func (s *schema) Scope() tsdb.Entry {
        return tsdb.Entry(s.stream.Metadata.Name)
 }
diff --git a/pkg/query/logical/stream/stream_analyzer.go 
b/pkg/query/logical/stream/stream_analyzer.go
index 2087bf4..5a9ee5e 100644
--- a/pkg/query/logical/stream/stream_analyzer.go
+++ b/pkg/query/logical/stream/stream_analyzer.go
@@ -22,50 +22,25 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
-// Analyzer analyzes the stream querying expression to the execution plan.
-type Analyzer struct {
-       metadataRepoImpl metadata.Repo
-}
-
-// CreateAnalyzerFromMetaService returns a Analyzer.
-func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer, 
error) {
-       return &Analyzer{
-               metaSvc,
-       }, nil
-}
-
 // BuildSchema returns Schema loaded from the metadata repository.
-func (a *Analyzer) BuildSchema(ctx context.Context, metadata 
*commonv1.Metadata) (logical.Schema, error) {
-       group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx, 
metadata.GetGroup())
-       if err != nil {
-               return nil, err
-       }
-       stream, err := a.metadataRepoImpl.StreamRegistry().GetStream(ctx, 
metadata)
-       if err != nil {
-               return nil, err
-       }
-
-       indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata)
-       if err != nil {
-               return nil, err
-       }
+func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
+       sm := streamSchema.GetSchema()
 
        s := &schema{
                common: &logical.CommonSchema{
-                       Group:      group,
-                       IndexRules: indexRules,
+                       IndexRules: streamSchema.GetIndexRules(),
                        TagMap:     make(map[string]*logical.TagSpec),
-                       EntityList: stream.GetEntity().GetTagNames(),
+                       EntityList: sm.GetEntity().GetTagNames(),
                },
-               stream: stream,
+               stream: sm,
        }
 
        // generate the streamSchema of the fields for the traceSeries
-       for tagFamilyIdx, tagFamily := range stream.GetTagFamilies() {
+       for tagFamilyIdx, tagFamily := range sm.GetTagFamilies() {
                for tagIdx, spec := range tagFamily.GetTags() {
                        s.registerTag(tagFamilyIdx, tagIdx, spec)
                }
@@ -75,7 +50,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata 
*commonv1.Metadata)
 }
 
 // Analyze converts logical expressions to executable operation tree 
represented by Plan.
-func (a *Analyzer) Analyze(_ context.Context, criteria *streamv1.QueryRequest, 
metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
+func Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata 
*commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
        // parse fields
        plan := parseTags(criteria, metadata)
 

Reply via email to