This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a73d911ea6ca957ddb7a4f93ea44a1a052be10ff Author: Gao Hongtao <[email protected]> AuthorDate: Wed Dec 7 06:31:57 2022 +0000 Access the metadata cache in the querying procedure. Signed-off-by: Gao Hongtao <[email protected]> --- banyand/query/processor.go | 23 ++++---------- banyand/stream/stream.go | 4 +++ banyand/stream/stream_query.go | 2 ++ banyand/tsdb/seriesdb.go | 1 - pkg/query/logical/measure/measure_analyzer.go | 44 ++++++--------------------- pkg/query/logical/measure/schema.go | 4 --- pkg/query/logical/schema.go | 8 ----- pkg/query/logical/stream/schema.go | 4 --- pkg/query/logical/stream/stream_analyzer.go | 41 +++++-------------------- 9 files changed, 30 insertions(+), 101 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 6171b1a..20b7bd6 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -52,7 +52,8 @@ var ( ) type queryService struct { - log *logger.Logger + log *logger.Logger + // TODO: remove the metaService once https://github.com/apache/skywalking/issues/10121 is fixed. metaService metadata.Service serviceRepo discovery.ServiceRepo pipeline queue.Queue @@ -85,19 +86,13 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - analyzer, err := logical_stream.CreateAnalyzerFromMetaService(p.metaService) - if err != nil { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build analyzer for stream %s: %v", meta.GetName(), err)) - return - } - - s, err := analyzer.BuildSchema(context.TODO(), meta) + s, err := logical_stream.BuildSchema(ec) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for stream %s: %v", meta.GetName(), err)) return } - plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s) + plan, err := logical_stream.Analyze(context.TODO(), queryCriteria, meta, s) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for stream %s: %v", meta.GetName(), err)) return @@ -143,19 +138,13 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - analyzer, err := logical_measure.CreateAnalyzerFromMetaService(p.metaService) - if err != nil { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build analyzer for measure %s: %v", meta.GetName(), err)) - return - } - - s, err := analyzer.BuildSchema(context.TODO(), meta) + s, err := logical_measure.BuildSchema(ec) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for measure %s: %v", meta.GetName(), err)) return } - plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s) + plan, err := logical_measure.Analyze(context.TODO(), queryCriteria, meta, s) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", meta.GetName(), err)) return diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index c8866cf..711b713 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -54,6 +54,10 @@ func (s *stream) GetMetadata() *commonv1.Metadata { return s.schema.Metadata } +func (s *stream) GetSchema() *databasev1.Stream { + return s.schema +} + func (s *stream) GetIndexRules() []*databasev1.IndexRule { return s.indexRules } diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go index 5b3ef7c..b2af26d 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/stream/stream_query.go @@ -45,6 +45,8 @@ type Stream interface { Shard(id common.ShardID) (tsdb.Shard, error) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) ParseElementID(item tsdb.Item) (string, error) + GetSchema() *databasev1.Stream + GetIndexRules() []*databasev1.IndexRule } var _ Stream = (*stream)(nil) diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 5f759e4..5b69639 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -329,7 +329,6 @@ func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) { var series string if e := s.l.Debug(); e.Enabled() { - // TODO: store following info when the debug is enabled errDecode = s.seriesMetadata.Put(prepend(seriesID.Marshal(), seriesPrefix), entityValuesBytes) if errDecode != nil { return nil, errDecode diff --git a/pkg/query/logical/measure/measure_analyzer.go b/pkg/query/logical/measure/measure_analyzer.go index 412362f..e516c23 100644 --- a/pkg/query/logical/measure/measure_analyzer.go +++ b/pkg/query/logical/measure/measure_analyzer.go @@ -22,57 +22,33 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" - "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -// Analyzer analyzes the measure querying expression to the execution plan. -type Analyzer struct { - metadataRepoImpl metadata.Repo -} - -// CreateAnalyzerFromMetaService returns a Analyzer. -func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer, error) { - return &Analyzer{ - metaSvc, - }, nil -} - // BuildSchema returns Schema loaded from the metadata repository. -func (a *Analyzer) BuildSchema(ctx context.Context, metadata *commonv1.Metadata) (logical.Schema, error) { - group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx, metadata.GetGroup()) - if err != nil { - return nil, err - } - measure, err := a.metadataRepoImpl.MeasureRegistry().GetMeasure(ctx, metadata) - if err != nil { - return nil, err - } - - indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata) - if err != nil { - return nil, err - } +func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) { + md := measureSchema.GetSchema() + md.GetEntity() ms := &schema{ common: &logical.CommonSchema{ - Group: group, - IndexRules: indexRules, + IndexRules: measureSchema.GetIndexRules(), TagMap: make(map[string]*logical.TagSpec), - EntityList: measure.GetEntity().GetTagNames(), + EntityList: md.GetEntity().GetTagNames(), }, - measure: measure, + measure: md, fieldMap: make(map[string]*logical.FieldSpec), } - for tagFamilyIdx, tagFamily := range measure.GetTagFamilies() { + for tagFamilyIdx, tagFamily := range md.GetTagFamilies() { for tagIdx, spec := range tagFamily.GetTags() { ms.registerTag(tagFamilyIdx, tagIdx, spec) } } - for fieldIdx, spec := range measure.GetFields() { + for fieldIdx, spec := range md.GetFields() { ms.registerField(fieldIdx, spec) } @@ -80,7 +56,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata *commonv1.Metadata) } // Analyze converts logical expressions to executable operation tree represented by Plan. -func (a *Analyzer) Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) { +func Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) { groupByEntity := false var groupByTags [][]*logical.Tag if criteria.GetGroupBy() != nil { diff --git a/pkg/query/logical/measure/schema.go b/pkg/query/logical/measure/schema.go index 7161a09..8a337aa 100644 --- a/pkg/query/logical/measure/schema.go +++ b/pkg/query/logical/measure/schema.go @@ -104,10 +104,6 @@ func (m *schema) Equal(s2 logical.Schema) bool { return false } -func (m *schema) ShardNumber() uint32 { - return m.common.ShardNumber() -} - // registerTag registers the tag spec with given tagFamilyIdx and tagIdx. func (m *schema) registerTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) { m.common.RegisterTag(tagFamilyIdx, tagIdx, spec) diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go index fedac76..4512b9d 100644 --- a/pkg/query/logical/schema.go +++ b/pkg/query/logical/schema.go @@ -20,7 +20,6 @@ package logical import ( "github.com/pkg/errors" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" ) @@ -36,7 +35,6 @@ type Schema interface { ProjTags(refs ...[]*TagRef) Schema ProjFields(refs ...*FieldRef) Schema Equal(Schema) bool - ShardNumber() uint32 } // TagSpec wraps offsets to access a tag in the raw data swiftly. @@ -55,7 +53,6 @@ func (fs *TagSpec) Equal(other *TagSpec) bool { // CommonSchema represents a sharable fields between independent schemas. // It provides common access methods at the same time. type CommonSchema struct { - Group *commonv1.Group IndexRules []*databasev1.IndexRule TagMap map[string]*TagSpec EntityList []string @@ -92,11 +89,6 @@ func (cs *CommonSchema) RegisterTag(tagFamilyIdx, tagIdx int, spec *databasev1.T } } -// ShardNumber returns the number of shards defined by the schema. -func (cs *CommonSchema) ShardNumber() uint32 { - return cs.Group.ResourceOpts.ShardNum -} - // IndexDefined checks whether the field given is indexed. func (cs *CommonSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { for _, idxRule := range cs.IndexRules { diff --git a/pkg/query/logical/stream/schema.go b/pkg/query/logical/stream/schema.go index 3961fb3..fc7bf59 100644 --- a/pkg/query/logical/stream/schema.go +++ b/pkg/query/logical/stream/schema.go @@ -86,10 +86,6 @@ func (s *schema) ProjFields(...*logical.FieldRef) logical.Schema { panic("stream does not support field") } -func (s *schema) ShardNumber() uint32 { - return s.common.ShardNumber() -} - func (s *schema) Scope() tsdb.Entry { return tsdb.Entry(s.stream.Metadata.Name) } diff --git a/pkg/query/logical/stream/stream_analyzer.go b/pkg/query/logical/stream/stream_analyzer.go index 2087bf4..5a9ee5e 100644 --- a/pkg/query/logical/stream/stream_analyzer.go +++ b/pkg/query/logical/stream/stream_analyzer.go @@ -22,50 +22,25 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -// Analyzer analyzes the stream querying expression to the execution plan. -type Analyzer struct { - metadataRepoImpl metadata.Repo -} - -// CreateAnalyzerFromMetaService returns a Analyzer. -func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer, error) { - return &Analyzer{ - metaSvc, - }, nil -} - // BuildSchema returns Schema loaded from the metadata repository. -func (a *Analyzer) BuildSchema(ctx context.Context, metadata *commonv1.Metadata) (logical.Schema, error) { - group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx, metadata.GetGroup()) - if err != nil { - return nil, err - } - stream, err := a.metadataRepoImpl.StreamRegistry().GetStream(ctx, metadata) - if err != nil { - return nil, err - } - - indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata) - if err != nil { - return nil, err - } +func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) { + sm := streamSchema.GetSchema() s := &schema{ common: &logical.CommonSchema{ - Group: group, - IndexRules: indexRules, + IndexRules: streamSchema.GetIndexRules(), TagMap: make(map[string]*logical.TagSpec), - EntityList: stream.GetEntity().GetTagNames(), + EntityList: sm.GetEntity().GetTagNames(), }, - stream: stream, + stream: sm, } // generate the streamSchema of the fields for the traceSeries - for tagFamilyIdx, tagFamily := range stream.GetTagFamilies() { + for tagFamilyIdx, tagFamily := range sm.GetTagFamilies() { for tagIdx, spec := range tagFamily.GetTags() { s.registerTag(tagFamilyIdx, tagIdx, spec) } @@ -75,7 +50,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata *commonv1.Metadata) } // Analyze converts logical expressions to executable operation tree represented by Plan. -func (a *Analyzer) Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) { +func Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) { // parse fields plan := parseTags(criteria, metadata)
