This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 4b63ff71 Periodic sync to sync the metadata from the etcd (#600) 4b63ff71 is described below commit 4b63ff7183e2165714a329683a64d2af72e384d1 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Feb 2 13:20:15 2025 +0800 Periodic sync to sync the metadata from the etcd (#600) --- .github/workflows/e2e.storage.yml | 11 + CHANGES.md | 1 + banyand/measure/block.go | 14 +- banyand/measure/measure.go | 103 +++----- banyand/measure/metadata.go | 285 +++++++++++++-------- banyand/measure/query.go | 48 ++-- banyand/measure/topn.go | 235 ++++++++++++----- banyand/measure/write.go | 19 +- banyand/metadata/client.go | 28 +- banyand/metadata/embeddedserver/server.go | 1 + banyand/metadata/schema/etcd.go | 109 ++++++-- banyand/metadata/schema/schema.go | 7 +- banyand/metadata/schema/watcher.go | 257 +++++++++++++------ banyand/metadata/schema/watcher_test.go | 247 +++++++++++++++++- banyand/query/processor_topn.go | 3 +- banyand/stream/benchmark_test.go | 7 +- banyand/stream/block.go | 11 +- banyand/stream/block_scanner.go | 2 + banyand/stream/metadata.go | 130 ++++------ banyand/stream/query.go | 25 +- banyand/stream/query_by_idx.go | 2 +- banyand/stream/query_by_ts.go | 9 +- banyand/stream/stream.go | 80 +++--- banyand/stream/write.go | 10 +- bydbctl/internal/cmd/measure_test.go | 3 +- go.mod | 4 +- pkg/partition/index.go | 6 +- pkg/query/logical/measure/topn_analyzer.go | 6 +- pkg/query/logical/measure/topn_plan_localscan.go | 11 +- .../logical/stream/stream_plan_indexscan_local.go | 23 +- pkg/query/model/model.go | 44 ++++ pkg/query/model/model_test.go | 6 +- pkg/schema/cache.go | 264 ++++++++++--------- pkg/schema/init.go | 188 ++------------ pkg/schema/schema.go | 20 +- scripts/ci/check/version_test.go | 2 +- .../standalone/cold_query/query_suite_test.go | 4 +- .../standalone/query/query_suite_test.go | 4 +- 38 files changed, 1386 insertions(+), 843 deletions(-) diff --git a/.github/workflows/e2e.storage.yml b/.github/workflows/e2e.storage.yml index c6fd07e2..b815ffac 100644 --- a/.github/workflows/e2e.storage.yml +++ b/.github/workflows/e2e.storage.yml @@ -96,6 +96,17 @@ jobs: uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180 with: e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }} + - if: ${{ failure() }} + run: | + df -h + du -sh . + docker images + - uses: actions/upload-artifact@v4 + if: ${{ failure() }} + name: Upload Logs + with: + name: test-logs-${{ matrix.test.name }} + path: "${{ env.SW_INFRA_E2E_LOG_DIR }}" Storage: if: (github.event_name == 'schedule' && github.repository == 'apache/skywalking-banyandb') || (github.event_name != 'schedule') diff --git a/CHANGES.md b/CHANGES.md index 34656715..1db15cdf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,7 @@ Release Notes. - Metadata: Wait for the existing registration to be removed before registering the node. - Stream: Introduce the batch scan to improve the performance of the query and limit the memory usage. - Add memory protector to protect the memory usage of the system. It will limit the memory usage of the querying. +- Metadata: Introduce the periodic sync to sync the metadata from the etcd to the local cache in case of the loss of the events. ### Bug Fixes diff --git a/banyand/measure/block.go b/banyand/measure/block.go index 37ce7a3b..80f6ca00 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -692,24 +692,24 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() cfm := make([]columnMetadata, 0, len(bc.fieldProjection)) NEXT_FIELD: - for j := range bc.fieldProjection { - for i := range bc.bm.field.columnMetadata { - if bc.bm.field.columnMetadata[i].name == bc.fieldProjection[j] { - cfm = append(cfm, bc.bm.field.columnMetadata[i]) + for _, fp := range bc.fieldProjection { + for _, cm := range bc.bm.field.columnMetadata { + if cm.name == fp { + cfm = append(cfm, cm) continue NEXT_FIELD } } cfm = append(cfm, columnMetadata{ - name: bc.fieldProjection[j], + name: fp, valueType: pbv1.ValueTypeUnknown, }) } bc.bm.field.columnMetadata = cfm bc.bm.tagProjection = bc.tagProjection var tf map[string]*dataBlock - for i := range bc.tagProjection { + for _, tp := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { - if bc.tagProjection[i].Family == tfName { + if tp.Family == tfName { if tf == nil { tf = make(map[string]*dataBlock, len(bc.tagProjection)) } diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 797bf83c..3edc7127 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -21,17 +21,14 @@ package measure import ( + "sync/atomic" "time" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/protector" - "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" - "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -52,40 +49,33 @@ type option struct { seriesCacheMaxSize run.Bytes } -type measure struct { - databaseSupplier schema.Supplier - pm *protector.Memory +type indexSchema struct { indexTagMap map[string]struct{} - l *logger.Logger - schema *databasev1.Measure - processorManager *topNProcessorManager fieldIndexLocation partition.FieldIndexLocation - name string - group string indexRuleLocators partition.IndexRuleLocator indexRules []*databasev1.IndexRule - topNAggregations []*databasev1.TopNAggregation - interval time.Duration - shardNum uint32 } -func (s *measure) startSteamingManager(pipeline queue.Queue) error { - if len(s.topNAggregations) == 0 { - return nil - } - tagMapSpec := logical.TagSpecMap{} - tagMapSpec.RegisterTagFamilies(s.schema.GetTagFamilies()) - - s.processorManager = &topNProcessorManager{ - l: s.l, - pipeline: pipeline, - m: s, - s: tagMapSpec, - topNSchemas: s.topNAggregations, - processorMap: make(map[*commonv1.Metadata][]*topNStreamingProcessor), +func (i *indexSchema) parse(schema *databasev1.Measure) { + i.indexRuleLocators, i.fieldIndexLocation = partition.ParseIndexRuleLocators(schema.GetEntity(), schema.GetTagFamilies(), i.indexRules, schema.IndexMode) + i.indexTagMap = make(map[string]struct{}) + for j := range i.indexRules { + for k := range i.indexRules[j].Tags { + i.indexTagMap[i.indexRules[j].Tags[k]] = struct{}{} + } } +} - return s.processorManager.start() +type measure struct { + indexSchema atomic.Value + tsdb atomic.Value + pm *protector.Memory + l *logger.Logger + schema *databasev1.Measure + schemaRepo *schemaRepo + name string + group string + interval time.Duration } func (s *measure) GetSchema() *databasev1.Measure { @@ -93,14 +83,18 @@ func (s *measure) GetSchema() *databasev1.Measure { } func (s *measure) GetIndexRules() []*databasev1.IndexRule { - return s.indexRules -} - -func (s *measure) Close() error { - if s.processorManager == nil { + is := s.indexSchema.Load() + if is == nil { return nil } - return s.processorManager.Close() + return is.(indexSchema).indexRules +} + +func (s *measure) OnIndexUpdate(index []*databasev1.IndexRule) { + var is indexSchema + is.indexRules = index + is.parse(s.schema) + s.indexSchema.Store(is) } func (s *measure) parseSpec() (err error) { @@ -108,44 +102,27 @@ func (s *measure) parseSpec() (err error) { if s.schema.Interval != "" { s.interval, err = timestamp.ParseDuration(s.schema.Interval) } - s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, s.schema.IndexMode) - s.indexTagMap = make(map[string]struct{}) - for j := range s.indexRules { - for k := range s.indexRules[j].Tags { - s.indexTagMap[s.indexRules[j].Tags[k]] = struct{}{} - } - } + var is indexSchema + is.parse(s.schema) + s.indexSchema.Store(is) return err } type measureSpec struct { - schema *databasev1.Measure - indexRules []*databasev1.IndexRule - topNAggregations []*databasev1.TopNAggregation + schema *databasev1.Measure } -func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec, - l *logger.Logger, pipeline queue.Queue, pm *protector.Memory, +func openMeasure(spec measureSpec, + l *logger.Logger, pm *protector.Memory, schemaRepo *schemaRepo, ) (*measure, error) { m := &measure{ - shardNum: shardNum, - schema: spec.schema, - indexRules: spec.indexRules, - topNAggregations: spec.topNAggregations, - l: l, - pm: pm, + schema: spec.schema, + l: l, + pm: pm, + schemaRepo: schemaRepo, } if err := m.parseSpec(); err != nil { return nil, err } - if db == nil { - return m, nil - } - - m.databaseSupplier = db - if startErr := m.startSteamingManager(pipeline); startErr != nil { - l.Err(startErr).Str("measure", spec.schema.GetMetadata().GetName()). - Msg("fail to start streaming manager") - } return m, nil } diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 229aa371..b5b8f1cf 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -22,9 +22,12 @@ import ( "fmt" "io" "path" + "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/pkg/errors" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -40,7 +43,27 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) -var metadataScope = measureScope.SubScope("metadata") +const ( + // TopNSchemaName is the name of the top n result schema. + TopNSchemaName = "_top_n_result" + // TopNTagFamily is the tag family name of the topN result measure. + TopNTagFamily = "_topN" + // TopNFieldName is the field name of the topN result measure. + TopNFieldName = "value" +) + +var ( + metadataScope = measureScope.SubScope("metadata") + + topNFieldsSpec = []*databasev1.FieldSpec{{ + Name: TopNFieldName, + FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY, + EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, + CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, + }} + // TopNTagNames is the tag names of the topN result measure. + TopNTagNames = []string{"name", "direction", "group"} +) // SchemaService allows querying schema information. type SchemaService interface { @@ -49,9 +72,11 @@ type SchemaService interface { } type schemaRepo struct { resourceSchema.Repository - l *logger.Logger - metadata metadata.Repo - path string + metadata metadata.Repo + pipeline queue.Queue + l *logger.Logger + topNProcessorMap sync.Map + path string } func newSchemaRepo(path string, svc *service) *schemaRepo { @@ -59,13 +84,14 @@ func newSchemaRepo(path string, svc *service) *schemaRepo { path: path, l: svc.l, metadata: svc.metadata, - Repository: resourceSchema.NewRepository( - svc.metadata, - svc.l, - newSupplier(path, svc), - resourceSchema.NewMetrics(svc.omr.With(metadataScope)), - ), + pipeline: svc.localPipeline, } + sr.Repository = resourceSchema.NewRepository( + svc.metadata, + svc.l, + newSupplier(path, svc, sr), + resourceSchema.NewMetrics(svc.omr.With(metadataScope)), + ) sr.start() return sr } @@ -106,7 +132,11 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) { logger.Panicf("unexpected kinds: %v", kinds) return false, nil } - return true, sr.Repository.Init(schema.KindMeasure) + groupNames, revs := sr.Repository.Init(schema.KindMeasure) + for i := range groupNames { + sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), groupNames[i]) + } + return true, revs } func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { @@ -125,70 +155,52 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { Kind: resourceSchema.EventKindGroup, Metadata: g, }) + sr.createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), g.Metadata.Name) case schema.KindMeasure: - if err := validate.Measure(metadata.Spec.(*databasev1.Measure)); err != nil { + m := metadata.Spec.(*databasev1.Measure) + if err := validate.Measure(m); err != nil { sr.l.Warn().Err(err).Msg("measure is ignored") return } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, Kind: resourceSchema.EventKindResource, - Metadata: metadata.Spec.(*databasev1.Measure), + Metadata: m, }) case schema.KindIndexRuleBinding: - irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding) - if !ok { - sr.l.Warn().Msg("fail to convert message to IndexRuleBinding") - return - } - if err := validate.IndexRuleBinding(irb); err != nil { - sr.l.Warn().Err(err).Msg("index rule binding is ignored") - return - } - if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - stm, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{ - Name: irb.GetSubject().GetName(), - Group: metadata.Group, - }) - cancel() - if err != nil { + if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok { + if err := validate.IndexRuleBinding(irb); err != nil { + sr.l.Warn().Err(err).Msg("index rule binding is ignored") return } - sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: stm, - }) + if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindIndexRuleBinding, + Metadata: irb, + }) + } } case schema.KindIndexRule: - if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil { - sr.l.Warn().Err(err).Msg("index rule is ignored") - return - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE) - if err != nil { - return - } - for _, sub := range subjects { + if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok { + if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil { + sr.l.Warn().Err(err).Msg("index rule is ignored") + return + } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: sub.(*databasev1.Measure), + Kind: resourceSchema.EventKindIndexRule, + Metadata: ir, }) } case schema.KindTopNAggregation: - if err := validate.TopNAggregation(metadata.Spec.(*databasev1.TopNAggregation)); err != nil { + topNSchema := metadata.Spec.(*databasev1.TopNAggregation) + if err := validate.TopNAggregation(topNSchema); err != nil { sr.l.Warn().Err(err).Msg("topNAggregation is ignored") return } - sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindTopNAgg, - Metadata: metadata.Spec.(*databasev1.TopNAggregation), - }) + manager := sr.getSteamingManager(topNSchema.SourceMeasure, sr.pipeline) + manager.register(topNSchema) default: } } @@ -206,40 +218,52 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) { Metadata: g, }) case schema.KindMeasure: + m := metadata.Spec.(*databasev1.Measure) sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventDelete, Kind: resourceSchema.EventKindResource, - Metadata: metadata.Spec.(*databasev1.Measure), + Metadata: m, }) + sr.stopSteamingManager(m.GetMetadata()) case schema.KindIndexRuleBinding: - if metadata.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - m, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{ - Name: metadata.Name, - Group: metadata.Group, - }) - if err != nil { - return + if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok { + if binding.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindIndexRuleBinding, + Metadata: metadata.Spec.(*databasev1.IndexRuleBinding), + }) } - // we should update instead of delete + } + + case schema.KindIndexRule: + if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok { sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: m, + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindIndexRule, + Metadata: rule, }) } - case schema.KindIndexRule: case schema.KindTopNAggregation: - sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindTopNAgg, - Metadata: metadata.Spec.(*databasev1.TopNAggregation), - }) + topNAggregation := metadata.Spec.(*databasev1.TopNAggregation) + sr.stopSteamingManager(topNAggregation.SourceMeasure) default: } } +func (sr *schemaRepo) Close() { + var err error + sr.topNProcessorMap.Range(func(_, val any) bool { + manager := val.(*topNProcessorManager) + err = multierr.Append(err, manager.Close()) + return true + }) + if err != nil { + sr.l.Error().Err(err).Msg("faced error when closing schema repository") + } + sr.Repository.Close() +} + func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) { r, ok := sr.LoadResource(metadata) if !ok { @@ -250,6 +274,9 @@ func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) } func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option], error) { + if sr == nil { + return nil, fmt.Errorf("schemaRepo is nil") + } g, ok := sr.LoadGroup(groupName) if !ok { return nil, fmt.Errorf("group %s not found", groupName) @@ -261,37 +288,64 @@ func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option] return db.(storage.TSDB[*tsTable, option]), nil } +func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, group string) { + md := GetTopNSchemaMetadata(group) + operation := func() error { + m, err := measureSchemaRegistry.GetMeasure(ctx, md) + if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) { + return errors.WithMessagef(err, "fail to get %s", md) + } + if m != nil { + return nil + } + + m = GetTopNSchema(md) + if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr != nil { + if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) { + return errors.WithMessagef(innerErr, "fail to create new topN measure %s", m) + } + } + return nil + } + + backoffStrategy := backoff.NewExponentialBackOff() + backoffStrategy.MaxElapsedTime = 2 * time.Minute + + err := backoff.Retry(operation, backoffStrategy) + if err != nil { + logger.Panicf("fail to create topN measure %s: %v", md, err) + } +} + var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { - metadata metadata.Repo - pipeline queue.Queue - omr observability.MetricsRegistry - l *logger.Logger - pm *protector.Memory - path string - option option + metadata metadata.Repo + omr observability.MetricsRegistry + l *logger.Logger + pm *protector.Memory + schemaRepo *schemaRepo + path string + option option } -func newSupplier(path string, svc *service) *supplier { +func newSupplier(path string, svc *service, sr *schemaRepo) *supplier { return &supplier{ - path: path, - metadata: svc.metadata, - l: svc.l, - pipeline: svc.localPipeline, - option: svc.option, - omr: svc.omr, - pm: svc.pm, + path: path, + metadata: svc.metadata, + l: svc.l, + option: svc.option, + omr: svc.omr, + pm: svc.pm, + schemaRepo: sr, } } -func (s *supplier) OpenResource(shardNum uint32, supplier resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) { +func (s *supplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) { measureSchema := spec.Schema().(*databasev1.Measure) - return openMeasure(shardNum, supplier, measureSpec{ - schema: measureSchema, - indexRules: spec.IndexRules(), - topNAggregations: spec.TopN(), - }, s.l, s.pipeline, s.pm) + return openMeasure(measureSpec{ + schema: measureSchema, + }, s.l, s.pm, s.schemaRepo) } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { @@ -348,11 +402,42 @@ func (*portableSupplier) OpenDB(_ *commonv1.Group) (io.Closer, error) { panic("do not support open db") } -func (s *portableSupplier) OpenResource(shardNum uint32, _ resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) { +func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) { measureSchema := spec.Schema().(*databasev1.Measure) - return openMeasure(shardNum, nil, measureSpec{ - schema: measureSchema, - indexRules: spec.IndexRules(), - topNAggregations: spec.TopN(), + return openMeasure(measureSpec{ + schema: measureSchema, }, s.l, nil, nil) } + +// GetTopNSchema returns the schema of the topN result measure. +func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure { + return &databasev1.Measure{ + Metadata: md, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: TopNTagFamily, + Tags: []*databasev1.TagSpec{ + {Name: TopNTagNames[0], Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: TopNTagNames[1], Type: databasev1.TagType_TAG_TYPE_INT}, + {Name: TopNTagNames[2], Type: databasev1.TagType_TAG_TYPE_STRING}, + }, + }, + }, + Fields: topNFieldsSpec, + Entity: &databasev1.Entity{ + TagNames: TopNTagNames, + }, + } +} + +// GetTopNSchemaMetadata returns the metadata of the topN result measure. +func GetTopNSchemaMetadata(group string) *commonv1.Metadata { + return &commonv1.Metadata{ + Name: TopNSchemaName, + Group: group, + } +} + +func getKey(metadata *commonv1.Metadata) string { + return path.Join(metadata.GetGroup(), metadata.GetName()) +} diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 4529c298..ab033187 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -22,7 +22,6 @@ import ( "container/heap" "context" "fmt" - "io" "sort" "github.com/pkg/errors" @@ -56,7 +55,6 @@ type Query interface { // Measure allows inspecting measure data points' details. type Measure interface { - io.Closer Query(ctx context.Context, opts model.MeasureQueryOptions) (model.MeasureQueryResult, error) GetSchema() *databasev1.Measure GetIndexRules() []*databasev1.IndexRule @@ -77,12 +75,18 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 { return nil, errors.New("invalid query options: tagProjection or fieldProjection is required") } - db := s.databaseSupplier.SupplyTSDB() + var tsdb storage.TSDB[*tsTable, option] + db := s.tsdb.Load() if db == nil { - return mqr, nil + tsdb, err = s.schemaRepo.loadTSDB(s.group) + if err != nil { + return nil, err + } + s.tsdb.Store(tsdb) + } else { + tsdb = db.(storage.TSDB[*tsTable, option]) } - tsdb := db.(storage.TSDB[*tsTable, option]) segments := tsdb.SelectSegments(*mqo.TimeRange) if len(segments) < 1 { return nilResult, nil @@ -184,6 +188,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m fieldToValueType := make(map[string]tagNameWithType) var projectedEntityOffsets map[string]int newTagProjection = make([]model.TagProjection, 0) + is := s.indexSchema.Load().(indexSchema) for _, tp := range mqo.TagProjection { var tagProjection model.TagProjection TAG: @@ -197,14 +202,16 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m continue TAG } } - if fields, ok := s.fieldIndexLocation[tp.Family]; ok { - if field, ok := fields[n]; ok { - indexProjection = append(indexProjection, field.Key) - fieldToValueType[field.Key.Marshal()] = tagNameWithType{ - fieldName: n, - typ: field.Type, + if is.fieldIndexLocation != nil { + if fields, ok := is.fieldIndexLocation[tp.Family]; ok { + if field, ok := fields[n]; ok { + indexProjection = append(indexProjection, field.Key) + fieldToValueType[field.Key.Marshal()] = tagNameWithType{ + fieldName: n, + typ: field.Type, + } + continue TAG } - continue TAG } } tagProjection.Family = tp.Family @@ -268,6 +275,7 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQu segments[i].DecRef() } }() + is := s.indexSchema.Load().(indexSchema) r := &indexSortResult{} var indexProjection []index.FieldKey for _, tp := range mqo.TagProjection { @@ -285,14 +293,16 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQu continue TAG } } - if fields, ok := s.fieldIndexLocation[tp.Family]; ok { - if field, ok := fields[n]; ok { - indexProjection = append(indexProjection, field.Key) - tagFamilyLocation.fieldToValueType[n] = tagNameWithType{ - fieldName: field.Key.Marshal(), - typ: field.Type, + if is.fieldIndexLocation != nil { + if fields, ok := is.fieldIndexLocation[tp.Family]; ok { + if field, ok := fields[n]; ok { + indexProjection = append(indexProjection, field.Key) + tagFamilyLocation.fieldToValueType[n] = tagNameWithType{ + fieldName: field.Key.Marshal(), + typ: field.Type, + } + continue TAG } - continue TAG } } return nil, fmt.Errorf("tag %s not found in schema", n) diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index 4170aef6..d566d561 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -50,7 +50,6 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/logical" - "github.com/apache/skywalking-banyandb/pkg/schema" ) const ( @@ -65,6 +64,51 @@ var ( _ flow.Sink = (*topNStreamingProcessor)(nil) ) +func (sr *schemaRepo) getSteamingManager(source *commonv1.Metadata, pipeline queue.Queue) (manager *topNProcessorManager) { + key := getKey(source) + sourceMeasure, ok := sr.loadMeasure(source) + if !ok { + m, _ := sr.topNProcessorMap.LoadOrStore(key, &topNProcessorManager{ + l: sr.l, + pipeline: pipeline, + }) + manager = m.(*topNProcessorManager) + return manager + } + + if v, ok := sr.topNProcessorMap.Load(key); ok { + pre := v.(*topNProcessorManager) + pre.init(sourceMeasure) + if pre.m.schema.GetMetadata().GetModRevision() < sourceMeasure.schema.GetMetadata().GetModRevision() { + defer pre.Close() + manager = &topNProcessorManager{ + l: sr.l, + pipeline: pipeline, + } + manager.registeredTasks = append(manager.registeredTasks, pre.registeredTasks...) + } else { + return pre + } + } + if manager == nil { + manager = &topNProcessorManager{ + l: sr.l, + pipeline: pipeline, + } + } + manager.init(sourceMeasure) + sr.topNProcessorMap.Store(key, manager) + return manager +} + +func (sr *schemaRepo) stopSteamingManager(sourceMeasure *commonv1.Metadata) { + key := getKey(sourceMeasure) + if v, ok := sr.topNProcessorMap.Load(key); ok { + v.(*topNProcessorManager).Close() + sr.topNProcessorMap.Delete(key) + } +} + type dataPointWithEntityValues struct { *measurev1.DataPointValue entityValues []*modelv1.TagValue @@ -199,7 +243,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err iwr := &measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ MessageId: uint64(time.Now().UnixNano()), - Metadata: &commonv1.Metadata{Name: schema.TopNSchemaName, Group: t.topNSchema.GetMetadata().Group}, + Metadata: &commonv1.Metadata{Name: TopNSchemaName, Group: t.topNSchema.GetMetadata().Group}, DataPoint: &measurev1.DataPointValue{ Timestamp: timestamppb.New(eventTime), TagFamilies: []*modelv1.TagFamilyForWrite{ @@ -270,94 +314,155 @@ func (t *topNStreamingProcessor) handleError() { // topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure. type topNProcessorManager struct { - l *logger.Logger - pipeline queue.Queue - m *measure - s logical.TagSpecRegistry - processorMap map[*commonv1.Metadata][]*topNStreamingProcessor - topNSchemas []*databasev1.TopNAggregation + l *logger.Logger + pipeline queue.Queue + m *measure + s logical.TagSpecRegistry + registeredTasks []*databasev1.TopNAggregation + processorList []*topNStreamingProcessor sync.RWMutex } +func (manager *topNProcessorManager) init(m *measure) { + manager.Lock() + defer manager.Unlock() + if manager.m != nil { + return + } + manager.m = m + tagMapSpec := logical.TagSpecMap{} + tagMapSpec.RegisterTagFamilies(m.schema.GetTagFamilies()) + for i := range manager.registeredTasks { + if err := manager.start(manager.registeredTasks[i]); err != nil { + manager.l.Err(err).Msg("fail to start processor") + } + } +} + func (manager *topNProcessorManager) Close() error { manager.Lock() defer manager.Unlock() var err error - for _, processorList := range manager.processorMap { - for _, processor := range processorList { - err = multierr.Append(err, processor.Close()) - } + for _, processor := range manager.processorList { + err = multierr.Append(err, processor.Close()) } + manager.processorList = nil + manager.registeredTasks = nil return err } -func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID uint32, request *measurev1.InternalWriteRequest) { +func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID uint32, request *measurev1.InternalWriteRequest, measure *measure) { go func() { manager.RLock() defer manager.RUnlock() - for _, processorList := range manager.processorMap { - for _, processor := range processorList { - processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{ - request.GetRequest().GetDataPoint(), - request.GetEntityValues(), - seriesID, - shardID, - }, request.GetRequest().GetDataPoint().GetTimestamp()) - } + if manager.m == nil { + manager.RUnlock() + manager.init(measure) + manager.RLock() + } + for _, processor := range manager.processorList { + processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{ + request.GetRequest().GetDataPoint(), + request.GetEntityValues(), + seriesID, + shardID, + }, request.GetRequest().GetDataPoint().GetTimestamp()) } }() } -func (manager *topNProcessorManager) start() error { - interval := manager.m.interval - for _, topNSchema := range manager.topNSchemas { - sortDirections := make([]modelv1.Sort, 0, 2) - if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED { - sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC) - } else { - sortDirections = append(sortDirections, topNSchema.GetFieldValueSort()) - } - - processorList := make([]*topNStreamingProcessor, len(sortDirections)) - for i, sortDirection := range sortDirections { - srcCh := make(chan interface{}) - src, _ := sources.NewChannel(srcCh) - name := strings.Join([]string{topNSchema.GetMetadata().Group, topNSchema.GetMetadata().Name, modelv1.Sort_name[int32(sortDirection)]}, "-") - streamingFlow := streaming.New(name, src) - - filters, buildErr := manager.buildFilter(topNSchema.GetCriteria()) - if buildErr != nil { - return buildErr - } - streamingFlow = streamingFlow.Filter(filters) - - mapper, innerErr := manager.buildMapper(topNSchema.GetFieldName(), topNSchema.GetGroupByTagNames()...) - if innerErr != nil { - return innerErr - } - streamingFlow = streamingFlow.Map(mapper) - processor := &topNStreamingProcessor{ - m: manager.m, - l: manager.l, - interval: interval, - topNSchema: topNSchema, - sortDirection: sortDirection, - src: srcCh, - in: make(chan flow.StreamRecord), - stopCh: make(chan struct{}), - streamingFlow: streamingFlow, - pipeline: manager.pipeline, - buf: make([]byte, 0, 64), +func (manager *topNProcessorManager) register(topNSchema *databasev1.TopNAggregation) { + manager.Lock() + defer manager.Unlock() + exist := false + for i := range manager.registeredTasks { + if manager.registeredTasks[i].GetMetadata().GetName() == topNSchema.GetMetadata().GetName() { + exist = true + if manager.registeredTasks[i].GetMetadata().GetModRevision() < topNSchema.GetMetadata().GetModRevision() { + prev := manager.registeredTasks[i] + prevProcessors := manager.removeProcessors(prev) + if err := manager.start(topNSchema); err != nil { + manager.l.Err(err).Msg("fail to start the new processor") + return + } + manager.registeredTasks[i] = topNSchema + for _, processor := range prevProcessors { + if err := processor.Close(); err != nil { + manager.l.Err(err).Msg("fail to close the prev processor") + } + } } - processorList[i] = processor.start() } + } + if exist { + return + } + manager.registeredTasks = append(manager.registeredTasks, topNSchema) + if err := manager.start(topNSchema); err != nil { + manager.l.Err(err).Msg("fail to start processor") + } +} - manager.processorMap[topNSchema.GetSourceMeasure()] = processorList +func (manager *topNProcessorManager) start(topNSchema *databasev1.TopNAggregation) error { + if manager.m == nil { + return nil } + interval := manager.m.interval + sortDirections := make([]modelv1.Sort, 0, 2) + if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED { + sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC) + } else { + sortDirections = append(sortDirections, topNSchema.GetFieldValueSort()) + } + + processorList := make([]*topNStreamingProcessor, len(sortDirections)) + for i, sortDirection := range sortDirections { + srcCh := make(chan interface{}) + src, _ := sources.NewChannel(srcCh) + name := strings.Join([]string{topNSchema.GetMetadata().Group, topNSchema.GetMetadata().Name, modelv1.Sort_name[int32(sortDirection)]}, "-") + streamingFlow := streaming.New(name, src) + + filters, buildErr := manager.buildFilter(topNSchema.GetCriteria()) + if buildErr != nil { + return buildErr + } + streamingFlow = streamingFlow.Filter(filters) + mapper, innerErr := manager.buildMapper(topNSchema.GetFieldName(), topNSchema.GetGroupByTagNames()...) + if innerErr != nil { + return innerErr + } + streamingFlow = streamingFlow.Map(mapper) + processor := &topNStreamingProcessor{ + m: manager.m, + l: manager.l, + interval: interval, + topNSchema: topNSchema, + sortDirection: sortDirection, + src: srcCh, + in: make(chan flow.StreamRecord), + stopCh: make(chan struct{}), + streamingFlow: streamingFlow, + pipeline: manager.pipeline, + buf: make([]byte, 0, 64), + } + processorList[i] = processor.start() + } + manager.processorList = append(manager.processorList, processorList...) return nil } +func (manager *topNProcessorManager) removeProcessors(topNSchema *databasev1.TopNAggregation) []*topNStreamingProcessor { + var processors []*topNStreamingProcessor + for i := range manager.processorList { + if manager.processorList[i].topNSchema.GetMetadata().GetName() == topNSchema.GetMetadata().GetName() { + processors = append(processors, manager.processorList[i]) + manager.processorList = append(manager.processorList[:i], manager.processorList[i+1:]...) + } + } + return processors +} + func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (flow.UnaryFunc[bool], error) { // if criteria is nil, we handle all incoming elements if criteria == nil { @@ -387,7 +492,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames return spec.GetName() == fieldName }) if fieldIdx == -1 { - return nil, errors.New("invalid fieldName") + return nil, fmt.Errorf("field %s is not found in %s schema", fieldName, manager.m.GetSchema().GetMetadata().GetName()) } if len(groupByNames) == 0 { return func(_ context.Context, request any) any { diff --git a/banyand/measure/write.go b/banyand/measure/write.go index da6f8f9d..ba62bc97 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -165,15 +165,15 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me } dpt.dataPoints.fields = append(dpt.dataPoints.fields, field) - if stm.processorManager != nil { - stm.processorManager.onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{ + if p, _ := w.schemaRepo.topNProcessorMap.Load(getKey(stm.schema.GetMetadata())); p != nil { + p.(*topNProcessorManager).onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ Metadata: stm.GetSchema().Metadata, DataPoint: req.DataPoint, MessageId: uint64(time.Now().UnixNano()), }, EntityValues: writeEvent.EntityValues, - }) + }, stm) } doc := index.Document{ @@ -223,10 +223,12 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi func (w *writeCallback) handleTagFamily(stm *measure, req *measurev1.WriteRequest) ([]nameValues, []index.Field) { tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies)) - if len(stm.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { + is := stm.indexSchema.Load().(indexSchema) + if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", - len(stm.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) + len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) } + var fields []index.Field for i := range stm.GetSchema().GetTagFamilies() { var tagFamily *modelv1.TagFamilyForWrite @@ -235,7 +237,7 @@ func (w *writeCallback) handleTagFamily(stm *measure, req *measurev1.WriteReques } else { tagFamily = req.DataPoint.TagFamilies[i] } - tfr := stm.indexRuleLocators.TagFamilyTRule[i] + tfr := is.indexRuleLocators.TagFamilyTRule[i] tagFamilySpec := stm.GetSchema().GetTagFamilies()[i] tf := nameValues{ name: tagFamilySpec.Name, @@ -283,7 +285,7 @@ func (w *writeCallback) handleTagFamily(stm *measure, req *measurev1.WriteReques } continue } - _, isEntity := stm.indexRuleLocators.EntitySet[t.Name] + _, isEntity := is.indexRuleLocators.EntitySet[t.Name] if tagFamilySpec.Tags[j].IndexedOnly || isEntity { continue } @@ -301,8 +303,9 @@ func (w *writeCallback) appendEntityTagsToIndexFields(fields []index.Field, stm f.Index = true f.NoSort = true fields = append(fields, f) + is := stm.indexSchema.Load().(indexSchema) for i := range stm.schema.Entity.TagNames { - if _, exists := stm.indexTagMap[stm.schema.Entity.TagNames[i]]; exists { + if _, exists := is.indexTagMap[stm.schema.Entity.TagNames[i]]; exists { continue } tagName := stm.schema.Entity.TagNames[i] diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index ba0f57a0..b70336f8 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -62,17 +62,18 @@ func NewClient(forceRegisterNode bool) (Service, error) { } type clientService struct { - schemaRegistry schema.Registry - closer *run.Closer - namespace string - etcdUsername string - etcdPassword string - etcdTLSCAFile string - etcdTLSCertFile string - etcdTLSKeyFile string - endpoints []string - registryTimeout time.Duration - forceRegisterNode bool + schemaRegistry schema.Registry + closer *run.Closer + namespace string + etcdUsername string + etcdPassword string + etcdTLSCAFile string + etcdTLSCertFile string + etcdTLSKeyFile string + endpoints []string + registryTimeout time.Duration + etcdFullSyncInterval time.Duration + forceRegisterNode bool } func (s *clientService) SchemaRegistry() schema.Registry { @@ -89,6 +90,7 @@ func (s *clientService) FlagSet() *run.FlagSet { fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client certificate") fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key for the etcd client certificate.") fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry") + fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 30*time.Minute, "The interval for full sync etcd") return fs } @@ -123,6 +125,7 @@ func (s *clientService) PreRun(ctx context.Context) error { schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword), schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile), schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile), + schema.ConfigureWatchCheckInterval(s.etcdFullSyncInterval), ) if errors.Is(err, context.DeadlineExceeded) { select { @@ -193,6 +196,9 @@ func (s *clientService) PreRun(ctx context.Context) error { } func (s *clientService) Serve() run.StopNotify { + if s.schemaRegistry != nil { + s.schemaRegistry.StartWatcher() + } return s.closer.CloseNotify() } diff --git a/banyand/metadata/embeddedserver/server.go b/banyand/metadata/embeddedserver/server.go index 44bd4864..908cbe7b 100644 --- a/banyand/metadata/embeddedserver/server.go +++ b/banyand/metadata/embeddedserver/server.go @@ -81,6 +81,7 @@ func (s *server) PreRun(ctx context.Context) error { } func (s *server) Serve() run.StopNotify { + _ = s.Service.Serve() return s.metaServer.StoppingNotify() } diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 4f2d9ed4..4e3d51e9 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -40,6 +40,11 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +const ( + minCheckInterval = time.Second * 5 + defaultCheckInterval = time.Minute * 10 +) + var ( _ Stream = (*etcdSchemaRegistry)(nil) _ IndexRuleBinding = (*etcdSchemaRegistry)(nil) @@ -101,13 +106,32 @@ func ConfigureEtcdTLSCertAndKey(certFile string, keyFile string) RegistryOption } } +// ConfigureWatchCheckInterval sets the interval to check the watcher. +func ConfigureWatchCheckInterval(d time.Duration) RegistryOption { + return func(config *etcdSchemaRegistryConfig) { + if d >= minCheckInterval { + config.checkInterval = d + } + } +} + +// CheckInterval sets the interval to check the watcher. +func CheckInterval(d time.Duration) WatcherOption { + return func(wc *watcherConfig) { + if d >= minCheckInterval { + wc.checkInterval = d + } + } +} + type etcdSchemaRegistry struct { - namespace string - client *clientv3.Client - closer *run.Closer - l *logger.Logger - watchers []*watcher - mux sync.RWMutex + client *clientv3.Client + closer *run.Closer + l *logger.Logger + watchers map[Kind]*watcher + namespace string + checkInterval time.Duration + mux sync.RWMutex } type etcdSchemaRegistryConfig struct { @@ -118,6 +142,7 @@ type etcdSchemaRegistryConfig struct { tlsCertFile string tlsKeyFile string serverEndpoints []string + checkInterval time.Duration } func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler EventHandler) { @@ -142,20 +167,42 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler Eve return } for i := range kinds { - e.l.Info().Str("name", name).Stringer("kind", kinds[i]).Msg("registering watcher") - w := e.newWatcherWithRevision(name, kinds[i], revisions[i], handler) - if w != nil { - e.watchers = append(e.watchers, w) - } + e.registerToWatcher(name, kinds[i], revisions[i], handler) } return } for i := range kinds { - e.l.Info().Str("name", name).Stringer("kind", kinds[i]).Msg("registering watcher") - w := e.NewWatcher(name, kinds[i], handler) - if w != nil { - e.watchers = append(e.watchers, w) + e.registerToWatcher(name, kinds[i], 0, handler) + } +} + +func (e *etcdSchemaRegistry) registerToWatcher(name string, kind Kind, revision int64, handler EventHandler) { + if w, ok := e.watchers[kind]; ok { + e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to an existing watcher") + w.AddHandler(handler) + if w.revision > revision { + w.revision = revision } + return + } + e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to a new watcher") + w := e.newWatcherWithRevision(name, kind, revision, CheckInterval(e.checkInterval)) + w.AddHandler(handler) + e.watchers[kind] = w +} + +func (e *etcdSchemaRegistry) Compact(ctx context.Context, revision int64) error { + if !e.closer.AddRunning() { + return ErrClosed + } + defer e.closer.Done() + _, err := e.client.Compact(ctx, revision) + return err +} + +func (e *etcdSchemaRegistry) StartWatcher() { + for _, w := range e.watchers { + w.Start() } } @@ -202,10 +249,12 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) { return nil, err } reg := &etcdSchemaRegistry{ - namespace: registryConfig.namespace, - client: client, - closer: run.NewCloser(1), - l: logger.GetLogger("schema-registry"), + namespace: registryConfig.namespace, + client: client, + closer: run.NewCloser(1), + l: logger.GetLogger("schema-registry"), + checkInterval: registryConfig.checkInterval, + watchers: make(map[Kind]*watcher), } return reg, nil } @@ -525,17 +574,21 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { } } -func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, handler watchEventHandler) *watcher { - return e.newWatcherWithRevision(name, kind, 0, handler) +func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, opts ...WatcherOption) *watcher { + return e.newWatcherWithRevision(name, kind, 0, opts...) } -func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, revision int64, handler watchEventHandler) *watcher { - return newWatcher(e.client, watcherConfig{ - key: e.prependNamespace(kind.key()), - kind: kind, - handler: handler, - revision: revision, - }, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String()))) +func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, revision int64, opts ...WatcherOption) *watcher { + wc := watcherConfig{ + key: e.prependNamespace(kind.key()), + kind: kind, + revision: revision, + checkInterval: 5 * time.Minute, // Default value + } + for _, opt := range opts { + opt(&wc) + } + return newWatcher(e.client, wc, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String()))) } func listPrefixesForEntity(group, entityPrefix string) string { diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index 94d9c324..8f4a8e35 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -51,6 +51,9 @@ type ListOpt struct { Group string } +// WatcherOption is a placeholder for watcher configuration. +type WatcherOption func(*watcherConfig) + // Registry allowing depositing resources. type Registry interface { io.Closer @@ -62,8 +65,10 @@ type Registry interface { TopNAggregation Node RegisterHandler(string, Kind, EventHandler) - NewWatcher(string, Kind, watchEventHandler) *watcher + NewWatcher(string, Kind, ...WatcherOption) *watcher Register(context.Context, Metadata, bool) error + Compact(context.Context, int64) error + StartWatcher() } // TypeMeta defines the identity and type of an Event. diff --git a/banyand/metadata/schema/watcher.go b/banyand/metadata/schema/watcher.go index 2464efe4..3c635237 100644 --- a/banyand/metadata/schema/watcher.go +++ b/banyand/metadata/schema/watcher.go @@ -18,13 +18,16 @@ package schema import ( - "errors" + "context" + "sync" "time" + "github.com/pkg/errors" mvccpb "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -34,69 +37,68 @@ type watchEventHandler interface { OnDelete(Metadata) } type watcherConfig struct { - handler watchEventHandler - key string - revision int64 - kind Kind + key string + revision int64 + kind Kind + checkInterval time.Duration +} + +type cacheEntry struct { + valueHash uint64 // Hash of the value + modRevision int64 // Last modified revision } type watcher struct { - handler watchEventHandler - cli *clientv3.Client - closer *run.Closer - l *logger.Logger - key string - revision int64 - kind Kind + cli *clientv3.Client + closer *run.Closer + l *logger.Logger + ticker *time.Ticker + cache map[string]cacheEntry + key string + handlers []watchEventHandler + revision int64 + kind Kind + checkInterval time.Duration + mu sync.RWMutex + startOnce sync.Once } func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger) *watcher { - w := &watcher{ - cli: cli, - key: wc.key, - kind: wc.kind, - handler: wc.handler, - revision: wc.revision, - closer: run.NewCloser(1), - l: l, + if wc.checkInterval == 0 { + wc.checkInterval = 5 * time.Minute } - revision := w.revision - if revision < 1 { - revision = w.allEvents() + w := &watcher{ + cli: cli, + key: wc.key, + kind: wc.kind, + revision: wc.revision, + closer: run.NewCloser(1), + l: l, + checkInterval: wc.checkInterval, + cache: make(map[string]cacheEntry), } - go w.watch(revision) return w } -func (w *watcher) Close() { - w.closer.Done() - w.closer.CloseThenWait() +func (w *watcher) Start() { + w.startOnce.Do(func() { + if w.revision < 1 { + w.periodicSync() + } + go w.watch(w.revision) + }) } -func (w *watcher) allEvents() int64 { - cli := w.cli - var resp *clientv3.GetResponse - start := time.Now() - var eventHandleTime time.Duration - var eventSize int - for { - var err error - if resp, err = cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix()); err == nil { - startHandle := time.Now() - eventSize = len(resp.Kvs) - w.handleAllEvents(resp.Kvs) - eventHandleTime = time.Since(startHandle) - break - } - select { - case <-w.closer.CloseNotify(): - return -1 - case <-time.After(1 * time.Second): - } +func (w *watcher) AddHandler(handler watchEventHandler) { + if w.handlers == nil { + w.handlers = make([]watchEventHandler, 0) } - w.l.Info().Dur("event_handle_time", eventHandleTime).Dur("total_time", time.Since(start)). - Int("event_size", eventSize).Str("key", w.key).Msg("watcher all events") - return resp.Header.Revision + w.handlers = append(w.handlers, handler) +} + +func (w *watcher) Close() { + w.closer.Done() + w.closer.CloseThenWait() } func (w *watcher) watch(revision int64) { @@ -105,17 +107,25 @@ func (w *watcher) watch(revision int64) { } defer w.closer.Done() cli := w.cli + + w.ticker = time.NewTicker(w.checkInterval) + defer w.ticker.Stop() + OUTER: for { - if revision < 0 { - revision = w.allEvents() - } select { case <-w.closer.CloseNotify(): return default: } + if revision < 0 { + // Use periodic sync to recover state and get new revision + w.periodicSync() + revision = w.revision + continue + } + wch := cli.Watch(w.closer.Ctx(), w.key, clientv3.WithPrefix(), clientv3.WithRev(revision+1), @@ -124,39 +134,34 @@ OUTER: if wch == nil { continue } + for { select { case <-w.closer.CloseNotify(): - w.l.Info().Msgf("watcher closed") + w.l.Info().Msg("watcher closed") return + case <-w.ticker.C: + w.periodicSync() case watchResp, ok := <-wch: if !ok { select { case <-w.closer.CloseNotify(): return default: - break OUTER + revision = -1 + continue OUTER } } if err := watchResp.Err(); err != nil { - select { - case <-w.closer.CloseNotify(): - return - default: - if errors.Is(err, v3rpc.ErrCompacted) { - revision = -1 - break - } - continue + if errors.Is(err, v3rpc.ErrCompacted) { + revision = -1 + continue OUTER } + continue } + w.revision = watchResp.Header.Revision for _, event := range watchResp.Events { - select { - case <-w.closer.CloseNotify(): - return - default: - w.handle(event, &watchResp) - } + w.handle(event, &watchResp) } } } @@ -164,31 +169,119 @@ OUTER: } func (w *watcher) handle(watchEvent *clientv3.Event, watchResp *clientv3.WatchResponse) { + keyStr := string(watchEvent.Kv.Key) + entry := cacheEntry{ + valueHash: convert.Hash(watchEvent.Kv.Value), + modRevision: watchEvent.Kv.ModRevision, + } + + handlers := w.handlers + if len(handlers) == 0 { + w.l.Panic().Msg("no handlers registered") + return + } + + w.mu.Lock() + defer w.mu.Unlock() + switch watchEvent.Type { case mvccpb.PUT: - md, err := w.kind.Unmarshal(watchEvent.Kv) - if err != nil { - w.l.Error().Stringer("event_header", &watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message") - return + if existing, exists := w.cache[keyStr]; !exists || existing.modRevision < entry.modRevision { + w.cache[keyStr] = entry + + md, err := w.kind.Unmarshal(watchEvent.Kv) + if err != nil { + w.l.Error().Stringer("event_header", &watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message") + return + } + for i := range handlers { + handlers[i].OnAddOrUpdate(md) + } } - w.handler.OnAddOrUpdate(md) case mvccpb.DELETE: + delete(w.cache, keyStr) md, err := w.kind.Unmarshal(watchEvent.PrevKv) if err != nil { w.l.Error().Stringer("event_header", &watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message") return } - w.handler.OnDelete(md) + for i := range handlers { + handlers[i].OnDelete(md) + } } } -func (w *watcher) handleAllEvents(kvs []*mvccpb.KeyValue) { - for i := 0; i < len(kvs); i++ { - md, err := w.kind.Unmarshal(kvs[i]) - if err != nil { - w.l.Error().AnErr("err", err).Msg("failed to unmarshal message") +func (w *watcher) periodicSync() { + resp, err := w.cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix()) + if err != nil { + if !errors.Is(err, context.Canceled) { + w.l.Error().Err(err).Msg("periodic sync failed to fetch keys") + } + return + } + + currentState := make(map[string]cacheEntry, len(resp.Kvs)) + for _, kv := range resp.Kvs { + currentState[string(kv.Key)] = cacheEntry{ + valueHash: convert.Hash(kv.Value), + modRevision: kv.ModRevision, + } + } + + handlers := w.handlers + if len(handlers) == 0 { + w.l.Panic().Msg("no handlers registered") + return + } + w.mu.Lock() + defer w.mu.Unlock() + + // Detect deletions and changes + for cachedKey, cachedEntry := range w.cache { + currentEntry, exists := currentState[cachedKey] + if !exists { + // Handle deletion + delete(w.cache, cachedKey) + if md, err := w.getFromStore(cachedKey); err == nil { + for i := range handlers { + handlers[i].OnDelete(*md) + } + } continue } - w.handler.OnAddOrUpdate(md) + + if currentEntry.valueHash != cachedEntry.valueHash { + // Handle update + if md, err := w.getFromStore(cachedKey); err == nil { + for i := range handlers { + handlers[i].OnAddOrUpdate(*md) + } + w.cache[cachedKey] = currentEntry + } + } + } + + // Detect additions + for key, entry := range currentState { + if _, exists := w.cache[key]; !exists { + if md, err := w.getFromStore(key); err == nil { + for i := range handlers { + handlers[i].OnAddOrUpdate(*md) + } + w.cache[key] = entry + } + } + } +} + +func (w *watcher) getFromStore(key string) (*Metadata, error) { + resp, err := w.cli.Get(w.closer.Ctx(), key) + if err != nil { + return nil, err + } + if len(resp.Kvs) == 0 { + return nil, errors.New("key not found") } + md, err := w.kind.Unmarshal(resp.Kvs[0]) + return &md, err } diff --git a/banyand/metadata/schema/watcher_test.go b/banyand/metadata/schema/watcher_test.go index 65c45b40..c2fa90dd 100644 --- a/banyand/metadata/schema/watcher_test.go +++ b/banyand/metadata/schema/watcher_test.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" ginkgo "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -171,7 +172,9 @@ var _ = ginkgo.Describe("Watcher", func() { } // Start the watcher - watcher := registry.NewWatcher("test", schema.KindMeasure, mockedObj) + watcher := registry.NewWatcher("test", schema.KindMeasure) + watcher.AddHandler(mockedObj) + watcher.Start() ginkgo.DeferCleanup(func() { watcher.Close() }) @@ -188,7 +191,9 @@ var _ = ginkgo.Describe("Watcher", func() { }, flags.EventuallyTimeout).Should(gomega.BeTrue()) }) ginkgo.It("should handle watch events", func() { - watcher := registry.NewWatcher("test", schema.KindStream, mockedObj) + watcher := registry.NewWatcher("test", schema.KindStream) + watcher.AddHandler(mockedObj) + watcher.Start() ginkgo.DeferCleanup(func() { watcher.Close() }) @@ -246,4 +251,242 @@ var _ = ginkgo.Describe("Watcher", func() { len(mockedObj.Data()) == 0 }, flags.EventuallyTimeout).Should(gomega.BeTrue()) }) + ginkgo.It("should load initial state and track revisions", func() { + groupName := "testgroup-initial" + err := registry.CreateGroup(context.Background(), &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: groupName, + }, + Catalog: commonv1.Catalog_CATALOG_MEASURE, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{ + Num: 1, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + Ttl: &commonv1.IntervalRule{ + Num: 3, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + _, err = registry.CreateMeasure(context.Background(), &databasev1.Measure{ + Metadata: &commonv1.Metadata{ + Name: "initial-key1", + Group: groupName, + }, + Entity: &databasev1.Entity{ + TagNames: []string{"testtag"}, + }, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "testtagfamily", + Tags: []*databasev1.TagSpec{ + { + Name: "testtag", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + }, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + _, err = registry.CreateMeasure(context.Background(), &databasev1.Measure{ + Metadata: &commonv1.Metadata{ + Name: "initial-key2", + Group: groupName, + }, + Entity: &databasev1.Entity{ + TagNames: []string{"testtag"}, + }, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "testtagfamily", + Tags: []*databasev1.TagSpec{ + { + Name: "testtag", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + }, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Second)) + watcher.AddHandler(mockedObj) + watcher.Start() + ginkgo.DeferCleanup(func() { + watcher.Close() + }) + + gomega.Eventually(func() int { + return len(mockedObj.Data()) + }, flags.EventuallyTimeout).Should(gomega.Equal(2)) + + gomega.Expect(mockedObj.addOrUpdateCalledNum.Load()).To(gomega.Equal(int32(2))) + }) + + ginkgo.It("should detect deletions", func() { + watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Second)) + watcher.AddHandler(mockedObj) + watcher.Start() + ginkgo.DeferCleanup(func() { + watcher.Close() + }) + + groupName := "testgroup-delete" + measureName := "delete-key" + err := registry.CreateGroup(context.Background(), &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: groupName, + }, + Catalog: commonv1.Catalog_CATALOG_MEASURE, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{ + Num: 1, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + Ttl: &commonv1.IntervalRule{ + Num: 3, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + _, err = registry.CreateMeasure(context.Background(), &databasev1.Measure{ + Metadata: &commonv1.Metadata{ + Name: measureName, + Group: groupName, + }, + Entity: &databasev1.Entity{ + TagNames: []string{"testtag"}, + }, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "testtagfamily", + Tags: []*databasev1.TagSpec{ + { + Name: "testtag", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + }, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Eventually(func() bool { + _, ok := mockedObj.Data()[measureName] + return ok + }, flags.EventuallyTimeout).Should(gomega.BeTrue()) + + deleted, err := registry.DeleteMeasure(context.Background(), &commonv1.Metadata{ + Name: measureName, + Group: groupName, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(deleted).To(gomega.BeTrue()) + + gomega.Eventually(func() int { + return int(mockedObj.deleteCalledNum.Load()) + }, 5*time.Second).Should(gomega.Equal(1)) + gomega.Expect(mockedObj.Data()).NotTo(gomega.HaveKey(measureName)) + }) + + ginkgo.It("should recover state after compaction", func() { + watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Hour)) + watcher.AddHandler(mockedObj) + watcher.Start() + ginkgo.DeferCleanup(func() { + watcher.Close() + }) + + groupName := "testgroup-compact" + measureName := "compact-key" + err := registry.CreateGroup(context.Background(), &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: groupName, + }, + Catalog: commonv1.Catalog_CATALOG_MEASURE, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{ + Num: 1, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + Ttl: &commonv1.IntervalRule{ + Num: 3, + Unit: commonv1.IntervalRule_UNIT_DAY, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + modRev, err := registry.CreateMeasure(context.Background(), &databasev1.Measure{ + Metadata: &commonv1.Metadata{ + Name: measureName, + Group: groupName, + }, + Entity: &databasev1.Entity{ + TagNames: []string{"testtag"}, + }, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "testtagfamily", + Tags: []*databasev1.TagSpec{ + { + Name: "testtag", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + }, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Eventually(func() bool { + _, ok := mockedObj.Data()[measureName] + return ok + }, flags.EventuallyTimeout).Should(gomega.BeTrue()) + + err = registry.Compact(context.Background(), modRev) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + updatedMeasure := &databasev1.Measure{ + Metadata: &commonv1.Metadata{ + Name: measureName, + Group: groupName, + }, + Entity: &databasev1.Entity{ + TagNames: []string{"testtag"}, + }, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "testtagfamily", + Tags: []*databasev1.TagSpec{ + { + Name: "testtag", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + { + Name: "testtag1", + Type: databasev1.TagType_TAG_TYPE_STRING, + }, + }, + }, + }, + } + _, err = registry.UpdateMeasure(context.Background(), updatedMeasure) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Eventually(func() int { + return int(mockedObj.addOrUpdateCalledNum.Load()) + }, 5*time.Second).Should(gomega.BeNumerically(">=", 2)) + }) }) diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go index 0bb22e7b..d765391a 100644 --- a/banyand/query/processor_topn.go +++ b/banyand/query/processor_topn.go @@ -40,7 +40,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/aggregation" "github.com/apache/skywalking-banyandb/pkg/query/executor" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" - pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" ) type topNQueryProcessor struct { @@ -105,7 +104,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("topn plan") } - topNResultMeasure, err := t.measureService.Measure(pkgschema.GetTopNSchemaMetadata(topNMetadata.Group)) + topNResultMeasure, err := t.measureService.Measure(measure.GetTopNSchemaMetadata(topNMetadata.Group)) if err != nil { ml.Error().Err(err).Str("topN", topNMetadata.GetName()).Msg("fail to find topn result measure") return diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index 866bfe88..af3644d8 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -256,10 +256,11 @@ func generateStream(db storage.TSDB[*tsTable, option]) *stream { Entity: entity, TagFamilies: []*databasev1.TagFamilySpec{tagFamily}, } - return &stream{ - databaseSupplier: dbSupplier, - schema: schema, + s := &stream{ + schema: schema, } + s.tsdb.Store(db) + return s } func generateStreamQueryOptions(p parameter, midx mockIndex) model.StreamQueryOptions { diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 00c4b337..e4a27945 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -510,9 +510,9 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() bc.bm.tagProjection = bc.tagProjection var tf map[string]*dataBlock - for i := range bc.tagProjection { + for _, tp := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { - if bc.tagProjection[i].Family == tfName { + if tp.Family == tfName { if tf == nil { tf = make(map[string]*dataBlock, len(bc.tagProjection)) } @@ -520,8 +520,15 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { } } } + if len(tf) == 0 { + return false + } + bc.bm.tagFamilies = tf tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm) + if len(tmpBlock.timestamps) == 0 { + return false + } idxList := make([]int, 0) var start, end int diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go index e8a54f03..7e3d89df 100644 --- a/banyand/stream/block_scanner.go +++ b/banyand/stream/block_scanner.go @@ -46,6 +46,7 @@ type blockScanResult struct { func (bs *blockScanResult) reset() { bs.p = nil + bs.qo.reset() bs.bm.reset() } @@ -55,6 +56,7 @@ type blockScanResultBatch struct { } func (bsb *blockScanResultBatch) reset() { + bsb.err = nil for i := range bsb.bss { bsb.bss[i].reset() } diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 07fdd873..8d1bea4a 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -107,7 +107,8 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) { logger.Panicf("invalid kinds: %v", kinds) return false, nil } - return true, sr.Repository.Init(schema.KindStream) + _, revs := sr.Repository.Init(schema.KindStream) + return true, revs } func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { @@ -137,47 +138,29 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { Metadata: metadata.Spec.(*databasev1.Stream), }) case schema.KindIndexRuleBinding: - irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding) - if !ok { - sr.l.Warn().Msg("fail to convert message to IndexRuleBinding") - return - } - if err := validate.IndexRuleBinding(irb); err != nil { - sr.l.Warn().Err(err).Msg("index rule binding is ignored") - return - } - if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - stm, err := sr.metadata.StreamRegistry().GetStream(ctx, &commonv1.Metadata{ - Name: irb.GetSubject().GetName(), - Group: metadata.Group, - }) - cancel() - if err != nil { + if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok { + if err := validate.IndexRuleBinding(irb); err != nil { + sr.l.Warn().Err(err).Msg("index rule binding is ignored") return } - sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: stm, - }) + if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM { + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventAddOrUpdate, + Kind: resourceSchema.EventKindIndexRuleBinding, + Metadata: irb, + }) + } } case schema.KindIndexRule: - if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil { - sr.l.Warn().Err(err).Msg("index rule is ignored") - return - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM) - if err != nil { - return - } - for _, sub := range subjects { + if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok { + if err := validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil { + sr.l.Warn().Err(err).Msg("index rule is ignored") + return + } sr.SendMetadataEvent(resourceSchema.MetadataEvent{ Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: sub.(*databasev1.Stream), + Kind: resourceSchema.EventKindIndexRule, + Metadata: ir, }) } default: @@ -203,24 +186,23 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) { Metadata: metadata.Spec.(*databasev1.Stream), }) case schema.KindIndexRuleBinding: - if metadata.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - m, err := sr.metadata.StreamRegistry().GetStream(ctx, &commonv1.Metadata{ - Name: metadata.Name, - Group: metadata.Group, - }) - if err != nil { - return + if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok { + if binding.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE { + sr.SendMetadataEvent(resourceSchema.MetadataEvent{ + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindIndexRuleBinding, + Metadata: metadata.Spec.(*databasev1.IndexRuleBinding), + }) } - // we should update instead of delete + } + case schema.KindIndexRule: + if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok { sr.SendMetadataEvent(resourceSchema.MetadataEvent{ - Typ: resourceSchema.EventAddOrUpdate, - Kind: resourceSchema.EventKindResource, - Metadata: m, + Typ: resourceSchema.EventDelete, + Kind: resourceSchema.EventKindIndexRule, + Metadata: rule, }) } - case schema.KindIndexRule: default: } } @@ -249,33 +231,34 @@ func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option] var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { - metadata metadata.Repo - pipeline queue.Queue - omr observability.MetricsRegistry - l *logger.Logger - pm *protector.Memory - path string - option option + metadata metadata.Repo + pipeline queue.Queue + omr observability.MetricsRegistry + l *logger.Logger + pm *protector.Memory + schemaRepo *schemaRepo + path string + option option } func newSupplier(path string, svc *service) *supplier { return &supplier{ - path: path, - metadata: svc.metadata, - l: svc.l, - pipeline: svc.localPipeline, - option: svc.option, - omr: svc.omr, - pm: svc.pm, + path: path, + metadata: svc.metadata, + l: svc.l, + pipeline: svc.localPipeline, + option: svc.option, + omr: svc.omr, + pm: svc.pm, + schemaRepo: &svc.schemaRepo, } } -func (s *supplier) OpenResource(shardNum uint32, supplier resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) { +func (s *supplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) { streamSchema := spec.Schema().(*databasev1.Stream) - return openStream(shardNum, supplier, streamSpec{ - schema: streamSchema, - indexRules: spec.IndexRules(), - }, s.l, s.pm), nil + return openStream(streamSpec{ + schema: streamSchema, + }, s.l, s.pm, s.schemaRepo), nil } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { @@ -332,10 +315,9 @@ func (*portableSupplier) OpenDB(_ *commonv1.Group) (io.Closer, error) { panic("do not support open db") } -func (s *portableSupplier) OpenResource(shardNum uint32, _ resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) { +func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.IndexListener, error) { streamSchema := spec.Schema().(*databasev1.Stream) - return openStream(shardNum, nil, streamSpec{ - schema: streamSchema, - indexRules: spec.IndexRules(), - }, s.l, nil), nil + return openStream(streamSpec{ + schema: streamSchema, + }, s.l, nil, nil), nil } diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 24658da3..4ed25ab4 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -40,8 +40,6 @@ import ( const checkDoneEvery = 128 -var nilResult = model.StreamQueryResult(nil) - func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") @@ -49,11 +47,17 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m if len(sqo.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } - db := s.databaseSupplier.SupplyTSDB() + var tsdb storage.TSDB[*tsTable, option] + db := s.tsdb.Load() if db == nil { - return nilResult, nil + tsdb, err = s.schemaRepo.loadTSDB(s.group) + if err != nil { + return nil, err + } + s.tsdb.Store(tsdb) + } else { + tsdb = db.(storage.TSDB[*tsTable, option]) } - tsdb := db.(storage.TSDB[*tsTable, option]) segments := tsdb.SelectSegments(*sqo.TimeRange) if len(segments) < 1 { return bypassQueryResultInstance, nil @@ -144,8 +148,17 @@ type queryOptions struct { maxTimestamp int64 } +func (qo *queryOptions) reset() { + qo.StreamQueryOptions.Reset() + qo.elementFilter = nil + qo.seriesToEntity = nil + qo.sortedSids = nil + qo.minTimestamp = 0 + qo.maxTimestamp = 0 +} + func (qo *queryOptions) copyFrom(other *queryOptions) { - qo.StreamQueryOptions = other.StreamQueryOptions + qo.StreamQueryOptions.CopyFrom(&other.StreamQueryOptions) qo.elementFilter = other.elementFilter qo.seriesToEntity = other.seriesToEntity qo.sortedSids = other.sortedSids diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go index e49e0429..80094ba7 100644 --- a/banyand/stream/query_by_idx.go +++ b/banyand/stream/query_by_idx.go @@ -131,6 +131,7 @@ func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamRes go func(i int) { select { case <-ctx.Done(): + releaseBlockCursor(qr.data[i]) cursorChan <- i return default: @@ -167,7 +168,6 @@ func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamRes return blankCursorList[i] > blankCursorList[j] }) for _, index := range blankCursorList { - releaseBlockCursor(qr.data[index]) qr.data = append(qr.data[:index], qr.data[index+1:]...) } qr.loaded = true diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go index 5f07f2ea..d45a46ef 100644 --- a/banyand/stream/query_by_ts.go +++ b/banyand/stream/query_by_ts.go @@ -48,7 +48,7 @@ type tsResult struct { func (t *tsResult) Pull(ctx context.Context) *model.StreamResult { if len(t.segments) == 0 { - return &model.StreamResult{} + return nil } if err := t.scanSegment(ctx); err != nil { return &model.StreamResult{Error: err} @@ -123,11 +123,11 @@ func (t *tsResult) scanSegment(ctx context.Context) error { blockHeap.Push(bc) } } - releaseBlockScanResultBatch(batch) heap.Init(blockHeap) result := blockHeap.merge(t.qo.MaxElementSize) t.shards[workerID].CopyFrom(tmpResult, result) blockHeap.reset() + releaseBlockScanResultBatch(batch) } workerWg.Done() }(i) @@ -158,13 +158,14 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm *stre for idx, tagFamily := range bc.tagFamilies { tagFamilyMap[tagFamily.name] = idx + 1 } + is := sm.indexSchema.Load().(indexSchema) for _, tagFamilyProj := range bc.tagProjection { for j, tagProj := range tagFamilyProj.Names { - tagSpec := sm.tagMap[tagProj] + tagSpec := is.tagMap[tagProj] if tagSpec.IndexedOnly { continue } - entityPos := sm.entityMap[tagProj] + entityPos := is.indexRuleLocators.EntitySet[tagProj] if entityPos == 0 { continue } diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 0b27dca6..b3a4756d 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -21,7 +21,7 @@ package stream import ( "context" - "io" + "sync/atomic" "time" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -59,26 +59,38 @@ type Query interface { // Stream allows inspecting elements' details. type Stream interface { - io.Closer GetSchema() *databasev1.Stream GetIndexRules() []*databasev1.IndexRule Query(ctx context.Context, opts model.StreamQueryOptions) (model.StreamQueryResult, error) } -var _ Stream = (*stream)(nil) - -type stream struct { - databaseSupplier schema.Supplier - l *logger.Logger - schema *databasev1.Stream +type indexSchema struct { tagMap map[string]*databasev1.TagSpec - entityMap map[string]int - pm *protector.Memory - name string - group string indexRuleLocators partition.IndexRuleLocator indexRules []*databasev1.IndexRule - shardNum uint32 +} + +func (i *indexSchema) parse(schema *databasev1.Stream) { + i.indexRuleLocators, _ = partition.ParseIndexRuleLocators(schema.GetEntity(), schema.GetTagFamilies(), i.indexRules, false) + i.tagMap = make(map[string]*databasev1.TagSpec) + for _, tf := range schema.GetTagFamilies() { + for _, tag := range tf.GetTags() { + i.tagMap[tag.GetName()] = tag + } + } +} + +var _ Stream = (*stream)(nil) + +type stream struct { + indexSchema atomic.Value + tsdb atomic.Value + l *logger.Logger + schema *databasev1.Stream + pm *protector.Memory + schemaRepo *schemaRepo + name string + group string } func (s *stream) GetSchema() *databasev1.Stream { @@ -86,48 +98,40 @@ func (s *stream) GetSchema() *databasev1.Stream { } func (s *stream) GetIndexRules() []*databasev1.IndexRule { - return s.indexRules + is := s.indexSchema.Load() + if is == nil { + return nil + } + return is.(indexSchema).indexRules } -func (s *stream) Close() error { - return nil +func (s *stream) OnIndexUpdate(index []*databasev1.IndexRule) { + var is indexSchema + is.indexRules = index + is.parse(s.schema) + s.indexSchema.Store(is) } func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.indexRuleLocators, _ = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, false) - s.tagMap = make(map[string]*databasev1.TagSpec) - for _, tf := range s.schema.GetTagFamilies() { - for _, tag := range tf.GetTags() { - s.tagMap[tag.GetName()] = tag - } - } - s.entityMap = make(map[string]int) - for idx, entity := range s.schema.GetEntity().GetTagNames() { - s.entityMap[entity] = idx + 1 - } + var is indexSchema + is.parse(s.schema) + s.indexSchema.Store(is) } type streamSpec struct { - schema *databasev1.Stream - indexRules []*databasev1.IndexRule + schema *databasev1.Stream } -func openStream(shardNum uint32, db schema.Supplier, - spec streamSpec, l *logger.Logger, pm *protector.Memory, +func openStream(spec streamSpec, + l *logger.Logger, pm *protector.Memory, schemaRepo *schemaRepo, ) *stream { s := &stream{ - shardNum: shardNum, schema: spec.schema, - indexRules: spec.indexRules, l: l, pm: pm, + schemaRepo: schemaRepo, } s.parseSpec() - if db == nil { - return s - } - - s.databaseSupplier = db return s } diff --git a/banyand/stream/write.go b/banyand/stream/write.go index db995f6b..9a0664ec 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -155,10 +155,12 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre } et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID) + is := stm.indexSchema.Load().(indexSchema) + tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies)) - if len(stm.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { + if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", - len(stm.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) + len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) } var fields []index.Field for i := range stm.GetSchema().GetTagFamilies() { @@ -168,7 +170,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre } else { tagFamily = req.Element.TagFamilies[i] } - tfr := stm.indexRuleLocators.TagFamilyTRule[i] + tfr := is.indexRuleLocators.TagFamilyTRule[i] tagFamilySpec := stm.GetSchema().GetTagFamilies()[i] tf := tagValues{ tag: tagFamilySpec.Name, @@ -190,7 +192,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre SeriesID: series.ID, }, t.Type, tagValue, r.GetNoSort()) } - _, isEntity := stm.indexRuleLocators.EntitySet[tagFamilySpec.Tags[j].Name] + _, isEntity := is.indexRuleLocators.EntitySet[t.Name] if tagFamilySpec.Tags[j].IndexedOnly || isEntity { continue } diff --git a/bydbctl/internal/cmd/measure_test.go b/bydbctl/internal/cmd/measure_test.go index 2f2c82ce..1c797f6c 100644 --- a/bydbctl/internal/cmd/measure_test.go +++ b/bydbctl/internal/cmd/measure_test.go @@ -179,7 +179,8 @@ entity: }) resp := new(databasev1.MeasureRegistryServiceListResponse) helpers.UnmarshalYAML([]byte(out), resp) - Expect(resp.Measure).To(HaveLen(2)) + // There is a _topn_result measure created by default + Expect(resp.Measure).To(HaveLen(3)) }) AfterEach(func() { diff --git a/go.mod b/go.mod index cecdc0b8..8ab8f6f8 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/apache/skywalking-banyandb go 1.23 -toolchain go1.23.1 +toolchain go1.23.5 require ( github.com/RoaringBitmap/roaring v1.9.4 @@ -74,7 +74,7 @@ require ( github.com/blugelabs/bluge_segment_api v0.2.0 github.com/blugelabs/ice v1.0.0 // indirect github.com/caio/go-tdigest v3.1.0+incompatible // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/pkg/partition/index.go b/pkg/partition/index.go index f3c69833..c60203fc 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -25,7 +25,7 @@ import ( // IndexRuleLocator is a helper struct to locate the index rule by tag name. type IndexRuleLocator struct { - EntitySet map[string]struct{} + EntitySet map[string]int TagFamilyTRule []map[string]*databasev1.IndexRule } @@ -42,10 +42,10 @@ type FieldIndexLocation map[string]map[string]FieldWithType func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.TagFamilySpec, indexRules []*databasev1.IndexRule, indexMode bool, ) (locators IndexRuleLocator, fil FieldIndexLocation) { - locators.EntitySet = make(map[string]struct{}, len(entity.TagNames)) + locators.EntitySet = make(map[string]int, len(entity.TagNames)) fil = make(FieldIndexLocation) for i := range entity.TagNames { - locators.EntitySet[entity.TagNames[i]] = struct{}{} + locators.EntitySet[entity.TagNames[i]] = i + 1 } findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule { for i := range indexRules { diff --git a/pkg/query/logical/measure/topn_analyzer.go b/pkg/query/logical/measure/topn_analyzer.go index eed24da5..1a52f4dd 100644 --- a/pkg/query/logical/measure/topn_analyzer.go +++ b/pkg/query/logical/measure/topn_analyzer.go @@ -23,8 +23,8 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/pkg/query/logical" - pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" ) // TopNAnalyze converts logical expressions to executable operation tree represented by Plan. @@ -48,7 +48,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, sourceMeasureSchema *databasev if criteria.GetAgg() != 0 { groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames() - groupByTags := [][]*logical.Tag{logical.NewTags(pkgschema.TopNTagFamily, groupByProjectionTags...)} + groupByTags := [][]*logical.Tag{logical.NewTags(measure.TopNTagFamily, groupByProjectionTags...)} plan = newUnresolvedGroupBy(plan, groupByTags, false) plan = newUnresolvedAggregation(plan, &logical.Field{Name: topNAggSchema.FieldName}, @@ -97,7 +97,7 @@ func buildVirtualSchema(sourceMeasureSchema *databasev1.Measure, fieldName strin Metadata: sourceMeasureSchema.Metadata, TagFamilies: []*databasev1.TagFamilySpec{ { - Name: pkgschema.TopNTagFamily, + Name: measure.TopNTagFamily, Tags: tags, }, }, diff --git a/pkg/query/logical/measure/topn_plan_localscan.go b/pkg/query/logical/measure/topn_plan_localscan.go index ee319af6..6eb19855 100644 --- a/pkg/query/logical/measure/topn_plan_localscan.go +++ b/pkg/query/logical/measure/topn_plan_localscan.go @@ -35,13 +35,12 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/query/model" - pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil) - fieldProjection = []string{pkgschema.TopNFieldName} + fieldProjection = []string{measure.TopNFieldName} ) type unresolvedLocalScan struct { @@ -90,13 +89,13 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, error) return &localScan{ s: s, options: model.MeasureQueryOptions{ - Name: pkgschema.TopNSchemaName, + Name: measure.TopNSchemaName, TimeRange: &tr, Entities: entities, TagProjection: []model.TagProjection{ { - Family: pkgschema.TopNTagFamily, - Names: pkgschema.TopNTagNames, + Family: measure.TopNTagFamily, + Names: measure.TopNTagNames, }, }, FieldProjection: fieldProjection, @@ -214,7 +213,7 @@ func (ei *topNMIterator) Next() bool { Version: r.Versions[i], } tagFamily := &modelv1.TagFamily{ - Name: pkgschema.TopNTagFamily, + Name: measure.TopNTagFamily, } dp.TagFamilies = append(dp.TagFamilies, tagFamily) for k, entityName := range entityNames { diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index eda67111..d485c6ff 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -79,7 +79,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro default: } if i.result != nil { - return BuildElementsFromStreamResult(ctx, i.result), nil + return BuildElementsFromStreamResult(ctx, i.result) } var orderBy *index.OrderBy if i.order != nil { @@ -104,7 +104,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro if i.result == nil { return nil, nil } - return BuildElementsFromStreamResult(ctx, i.result), nil + return BuildElementsFromStreamResult(ctx, i.result) } func (i *localIndexScan) String() string { @@ -125,10 +125,19 @@ func (i *localIndexScan) Schema() logical.Schema { } // BuildElementsFromStreamResult builds a slice of elements from the given stream query result. -func BuildElementsFromStreamResult(ctx context.Context, result model.StreamQueryResult) (elements []*streamv1.Element) { - r := result.Pull(ctx) - if r == nil { - return nil +func BuildElementsFromStreamResult(ctx context.Context, result model.StreamQueryResult) (elements []*streamv1.Element, err error) { + var r *model.StreamResult + for { + r = result.Pull(ctx) + if r == nil { + return nil, nil + } + if r.Error != nil { + return nil, r.Error + } + if len(r.Timestamps) > 0 { + break + } } for i := range r.Timestamps { e := &streamv1.Element{ @@ -150,5 +159,5 @@ func BuildElementsFromStreamResult(ctx context.Context, result model.StreamQuery } elements = append(elements, e) } - return elements + return elements, nil } diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index e40f2581..7a31cf1b 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -91,6 +91,44 @@ type StreamQueryOptions struct { MaxElementSize int } +// Reset resets the StreamQueryOptions. +func (s *StreamQueryOptions) Reset() { + s.Name = "" + s.TimeRange = nil + s.Entities = nil + s.Filter = nil + s.Order = nil + s.TagProjection = nil + s.MaxElementSize = 0 +} + +// CopyFrom copies the StreamQueryOptions from other to s. +func (s *StreamQueryOptions) CopyFrom(other *StreamQueryOptions) { + s.Name = other.Name + s.TimeRange = other.TimeRange + + // Deep copy for Entities if it's a slice + if other.Entities != nil { + s.Entities = make([][]*modelv1.TagValue, len(other.Entities)) + copy(s.Entities, other.Entities) + } else { + s.Entities = nil + } + + s.Filter = other.Filter + s.Order = other.Order + + // Deep copy if TagProjection is a slice + if other.TagProjection != nil { + s.TagProjection = make([]TagProjection, len(other.TagProjection)) + copy(s.TagProjection, other.TagProjection) + } else { + s.TagProjection = nil + } + + s.MaxElementSize = other.MaxElementSize +} + // StreamResult is the result of a query. type StreamResult struct { Error error @@ -201,6 +239,8 @@ func (sr *StreamResult) CopySingleFrom(other *StreamResult) { } } +var bypassStreamResult = &StreamResult{} + // StreamResultHeap is a min-heap of StreamResult pointers. type StreamResultHeap struct { data []*StreamResult @@ -242,6 +282,10 @@ func MergeStreamResults(results []*StreamResult, topN int, asc bool) *StreamResu } } + if h.Len() == 0 { + return bypassStreamResult + } + mergedResult := NewStreamResult(topN, asc) for h.Len() > 0 && mergedResult.Len() < topN { diff --git a/pkg/query/model/model_test.go b/pkg/query/model/model_test.go index 500757b8..92247e61 100644 --- a/pkg/query/model/model_test.go +++ b/pkg/query/model/model_test.go @@ -560,9 +560,9 @@ func TestMergeStreamResults(t *testing.T) { results: []*StreamResult{NewStreamResult(3, true), NewStreamResult(3, true)}, topN: 3, asc: true, - wantTS: []int64{}, - wantTags: []TagFamily{}, - wantSIDs: []common.SeriesID{}, + wantTS: nil, + wantTags: nil, + wantSIDs: nil, }, { name: "one with data, one empty", diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 05c59d81..0f8e9615 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -41,53 +41,24 @@ import ( var _ Resource = (*resourceSpec)(nil) type resourceSpec struct { - schema ResourceSchema - delegated io.Closer - indexRules []*databasev1.IndexRule - aggregations []*databasev1.TopNAggregation + schema ResourceSchema + delegated IndexListener } -func (rs *resourceSpec) Delegated() io.Closer { +func (rs *resourceSpec) Delegated() IndexListener { return rs.delegated } -func (rs *resourceSpec) Close() error { - return rs.delegated.Close() -} - func (rs *resourceSpec) Schema() ResourceSchema { return rs.schema } -func (rs *resourceSpec) IndexRules() []*databasev1.IndexRule { - return rs.indexRules -} - -func (rs *resourceSpec) TopN() []*databasev1.TopNAggregation { - return rs.aggregations -} - func (rs *resourceSpec) maxRevision() int64 { return rs.schema.GetMetadata().GetModRevision() } func (rs *resourceSpec) isNewThan(other *resourceSpec) bool { - if other.maxRevision() > rs.maxRevision() { - return false - } - if len(rs.indexRules) != len(other.indexRules) { - return false - } - if len(rs.aggregations) != len(other.aggregations) { - return false - } - if parseMaxModRevision(other.indexRules) > parseMaxModRevision(rs.indexRules) { - return false - } - if parseMaxModRevision(other.aggregations) > parseMaxModRevision(rs.aggregations) { - return false - } - return true + return other.maxRevision() <= rs.maxRevision() } const maxWorkerNum = 8 @@ -112,6 +83,9 @@ type schemaRepo struct { metrics *Metrics groupMap sync.Map resourceMap sync.Map + indexRuleMap sync.Map + bindingForwardMap sync.Map + bindingBackwardMap sync.Map workerNum int resourceMutex sync.Mutex groupMux sync.Mutex @@ -200,25 +174,50 @@ func (sr *schemaRepo) Watcher() { switch evt.Kind { case EventKindGroup: _, err = sr.storeGroup(evt.Metadata.GetMetadata()) - case EventKindResource: - err = sr.initResource(evt.Metadata.GetMetadata()) - case EventKindTopNAgg: - topNSchema := evt.Metadata.(*databasev1.TopNAggregation) - err = createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), topNSchema.GetMetadata().Group) - if err != nil { - break + if errors.As(err, schema.ErrGRPCResourceNotFound) { + err = nil } - err = sr.initResource(topNSchema.SourceMeasure) + case EventKindResource: + err = sr.storeResource(evt.Metadata) + case EventKindIndexRule: + indexRule := evt.Metadata.(*databasev1.IndexRule) + sr.storeIndexRule(indexRule) + case EventKindIndexRuleBinding: + indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) + sr.storeIndexRuleBinding(indexRuleBinding) } case EventDelete: switch evt.Kind { case EventKindGroup: err = sr.deleteGroup(evt.Metadata.GetMetadata()) case EventKindResource: - err = sr.deleteResource(evt.Metadata.GetMetadata()) - case EventKindTopNAgg: - topNSchema := evt.Metadata.(*databasev1.TopNAggregation) - err = sr.initResource(topNSchema.SourceMeasure) + sr.deleteResource(evt.Metadata.GetMetadata()) + case EventKindIndexRule: + key := getKey(evt.Metadata.GetMetadata()) + sr.indexRuleMap.Delete(key) + case EventKindIndexRuleBinding: + indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) + col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Subject.GetName(), + Group: indexRuleBinding.GetMetadata().GetGroup(), + })) + if col == nil { + break + } + tMap := col.(*sync.Map) + key := getKey(indexRuleBinding.GetMetadata()) + tMap.Delete(key) + for i := range indexRuleBinding.Rules { + col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Rules[i], + Group: indexRuleBinding.GetMetadata().GetGroup(), + })) + if col == nil { + continue + } + tMap := col.(*sync.Map) + tMap.Delete(key) + } } } if err != nil && !errors.Is(err, schema.ErrClosed) { @@ -320,17 +319,13 @@ func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) return s.(Resource), true } -func (sr *schemaRepo) storeResource(g Group, stm ResourceSchema, - idxRules []*databasev1.IndexRule, topNAggrs []*databasev1.TopNAggregation, -) error { +func (sr *schemaRepo) storeResource(resourceSchema ResourceSchema) error { sr.resourceMutex.Lock() defer sr.resourceMutex.Unlock() resource := &resourceSpec{ - schema: stm, - indexRules: idxRules, - aggregations: topNAggrs, + schema: resourceSchema, } - key := getKey(stm.GetMetadata()) + key := getKey(resourceSchema.GetMetadata()) pre, loadedPre := sr.resourceMap.Load(key) var preResource *resourceSpec if loadedPre { @@ -339,71 +334,119 @@ func (sr *schemaRepo) storeResource(g Group, stm ResourceSchema, if loadedPre && preResource.isNewThan(resource) { return nil } - var dbSupplier Supplier - if !g.(*group).isPortable() { - dbSupplier = g - } - sm, err := sr.resourceSchemaSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, dbSupplier, resource) + sm, err := sr.resourceSchemaSupplier.OpenResource(resource) if err != nil { return errors.WithMessage(err, "fails to open the resource") } + sm.OnIndexUpdate(sr.indexRules(resourceSchema)) resource.delegated = sm sr.resourceMap.Store(key, resource) - if loadedPre { - return preResource.Close() - } return nil } -func getKey(metadata *commonv1.Metadata) string { - return path.Join(metadata.GetGroup(), metadata.GetName()) +func (sr *schemaRepo) storeIndexRule(indexRule *databasev1.IndexRule) { + key := getKey(indexRule.GetMetadata()) + if prev, loaded := sr.indexRuleMap.LoadOrStore(key, indexRule); loaded { + if prev.(*databasev1.IndexRule).GetMetadata().ModRevision <= indexRule.GetMetadata().ModRevision { + sr.indexRuleMap.Store(key, indexRule) + if col, _ := sr.bindingBackwardMap.Load(key); col != nil { + col.(*sync.Map).Range(func(_, value any) bool { + sr.updateIndex(value.(*databasev1.IndexRuleBinding)) + return true + }) + } + } + } else { + if col, _ := sr.bindingBackwardMap.Load(key); col != nil { + col.(*sync.Map).Range(func(_, value any) bool { + sr.updateIndex(value.(*databasev1.IndexRuleBinding)) + return true + }) + } + } } -func (sr *schemaRepo) initResource(metadata *commonv1.Metadata) error { - g, ok := sr.LoadGroup(metadata.Group) - if !ok { - var err error - if g, err = sr.storeGroup(&commonv1.Metadata{Name: metadata.Group}); err != nil { - return errors.WithMessagef(err, "create unknown group:%s", metadata.Group) +func (sr *schemaRepo) storeIndexRuleBinding(indexRuleBinding *databasev1.IndexRuleBinding) { + var changed bool + col, _ := sr.bindingForwardMap.LoadOrStore(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Subject.GetName(), + Group: indexRuleBinding.GetMetadata().GetGroup(), + }), &sync.Map{}) + tMap := col.(*sync.Map) + key := getKey(indexRuleBinding.GetMetadata()) + if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); loaded { + if prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= indexRuleBinding.GetMetadata().ModRevision { + tMap.Store(key, indexRuleBinding) + changed = true } - } - stm, err := sr.resourceSchemaSupplier.ResourceSchema(metadata) - if err != nil { - if errors.Is(err, schema.ErrGRPCResourceNotFound) { - if dl := sr.l.Debug(); dl.Enabled() { - dl.Interface("metadata", metadata).Msg("resource not found") + } else { + changed = true + } + for i := range indexRuleBinding.Rules { + col, _ := sr.bindingBackwardMap.LoadOrStore(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Rules[i], + Group: indexRuleBinding.GetMetadata().GetGroup(), + }), &sync.Map{}) + tMap := col.(*sync.Map) + key := getKey(indexRuleBinding.GetMetadata()) + if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); loaded { + if prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= indexRuleBinding.GetMetadata().ModRevision { + tMap.Store(key, indexRuleBinding) + changed = true } - return nil + } else { + changed = true } - return errors.WithMessage(err, "fails to get the resource") } - ctx := context.Background() - localCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - idxRules, err := sr.metadata.IndexRules(localCtx, stm.GetMetadata()) - if err != nil { - return err + if !changed { + return } - var topNAggrs []*databasev1.TopNAggregation - if _, ok := stm.(*databasev1.Measure); ok { - localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) - var innerErr error - topNAggrs, innerErr = sr.metadata.MeasureRegistry().TopNAggregations(localCtx, stm.GetMetadata()) - cancel() - if innerErr != nil { - return errors.WithMessage(innerErr, "fails to get the topN aggregations") - } + sr.updateIndex(indexRuleBinding) +} + +func (sr *schemaRepo) updateIndex(binding *databasev1.IndexRuleBinding) { + if r, ok := sr.LoadResource(&commonv1.Metadata{ + Name: binding.Subject.GetName(), + Group: binding.GetMetadata().GetGroup(), + }); ok { + r.Delegated().OnIndexUpdate(sr.indexRules(r.Schema())) } - return sr.storeResource(g, stm, idxRules, topNAggrs) } -func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error { - key := getKey(metadata) - pre, loaded := sr.resourceMap.LoadAndDelete(key) - if !loaded { +func (sr *schemaRepo) indexRules(schema ResourceSchema) []*databasev1.IndexRule { + n := schema.GetMetadata().GetName() + g := schema.GetMetadata().GetGroup() + col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{ + Name: n, + Group: g, + })) + if col == nil { return nil } - return pre.(Resource).Close() + tMap := col.(*sync.Map) + var indexRules []*databasev1.IndexRule + tMap.Range(func(_, value any) bool { + indexRuleBinding := value.(*databasev1.IndexRuleBinding) + for i := range indexRuleBinding.Rules { + if r, _ := sr.indexRuleMap.Load(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Rules[i], + Group: g, + })); r != nil { + indexRules = append(indexRules, r.(*databasev1.IndexRule)) + } + } + return true + }) + return indexRules +} + +func getKey(metadata *commonv1.Metadata) string { + return path.Join(metadata.GetGroup(), metadata.GetName()) +} + +func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) { + key := getKey(metadata) + _, _ = sr.resourceMap.LoadAndDelete(key) } func (sr *schemaRepo) Close() { @@ -415,23 +458,6 @@ func (sr *schemaRepo) Close() { sr.closer.CloseThenWait() close(sr.eventCh) - sr.resourceMutex.Lock() - sr.resourceMap.Range(func(_, value any) bool { - if value == nil { - return true - } - r, ok := value.(*resourceSpec) - if !ok { - return true - } - err := r.Close() - if err != nil { - sr.l.Err(err).RawJSON("resource", logger.Proto(r.Schema().GetMetadata())).Msg("closing") - } - return true - }) - sr.resourceMutex.Unlock() - sr.groupMux.Lock() defer sr.groupMux.Unlock() sr.groupMap.Range(func(_, value any) bool { @@ -523,13 +549,3 @@ func (g *group) close() (err error) { } return multierr.Append(err, g.SupplyTSDB().Close()) } - -func parseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) { - maxRevisionForIdxRules = int64(0) - for _, idxRule := range indexRules { - if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules { - maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision() - } - } - return -} diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 5050e4bc..8cbe0a12 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -22,32 +22,12 @@ import ( "fmt" "time" - "github.com/pkg/errors" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/logger" ) -const ( - // TopNTagFamily is the tag family name of the topN result measure. - TopNTagFamily = "_topN" - // TopNFieldName is the field name of the topN result measure. - TopNFieldName = "value" -) - -var ( - initTimeout = 10 * time.Second - topNFieldsSpec = []*databasev1.FieldSpec{{ - Name: TopNFieldName, - FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY, - EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, - CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, - }} - // TopNTagNames is the tag names of the topN result measure. - TopNTagNames = []string{"name", "direction", "group"} -) +var initTimeout = 10 * time.Second type revisionContext struct { group int64 @@ -67,18 +47,19 @@ type revisionContextKey struct{} var revCtxKey = revisionContextKey{} -func (sr *schemaRepo) Init(kind schema.Kind) []int64 { +func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) { if kind != schema.KindMeasure && kind != schema.KindStream { - return nil + return nil, nil } catalog := sr.getCatalog(kind) ctx := context.Background() groups, err := sr.metadata.GroupRegistry().ListGroup(ctx) if err != nil { logger.Panicf("fails to get the groups: %v", err) - return nil + return nil, nil } var revCtx revisionContext + groupNames := make([]string, 0, len(groups)) for _, g := range groups { if g.Catalog != catalog { continue @@ -87,13 +68,14 @@ func (sr *schemaRepo) Init(kind schema.Kind) []int64 { revCtx.group = g.Metadata.ModRevision } sr.processGroup(context.WithValue(ctx, revCtxKey, &revCtx), g, catalog) + groupNames = append(groupNames, g.Metadata.Name) } if kind == schema.KindMeasure { sr.l.Info().Stringer("revision", revCtx).Msg("init measures") - return []int64{revCtx.group, revCtx.measure, revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg} + return groupNames, []int64{revCtx.group, revCtx.measure, revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg} } sr.l.Info().Stringer("revision", revCtx).Msg("init stream") - return []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, revCtx.indexRule} + return groupNames, []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, revCtx.indexRule} } func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog { @@ -104,20 +86,20 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog { } func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, catalog commonv1.Catalog) { - group, err := sr.initGroup(g) + _, err := sr.initGroup(g) if err != nil { logger.Panicf("fails to init the group: %v", err) } - bindings := sr.getBindings(ctx, g.Metadata.GetName()) - rules := sr.getRules(ctx, g.Metadata.GetName()) + sr.processRules(ctx, g.Metadata.GetName()) + sr.processBindings(ctx, g.Metadata.GetName()) if catalog == commonv1.Catalog_CATALOG_MEASURE { - sr.processMeasure(ctx, g.Metadata.Name, group, bindings, rules) + sr.processMeasure(ctx, g.Metadata.Name) return } - sr.processStream(ctx, g.Metadata.Name, group, bindings, rules) + sr.processStream(ctx, g.Metadata.Name) } -func (sr *schemaRepo) getBindings(ctx context.Context, gName string) map[string][]string { +func (sr *schemaRepo) processBindings(ctx context.Context, gName string) { ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() start := time.Now() @@ -126,18 +108,16 @@ func (sr *schemaRepo) getBindings(ctx context.Context, gName string) map[string] logger.Panicf("fails to get the index rule bindings: %v", err) } revCtx := ctx.Value(revCtxKey).(*revisionContext) - bindings := make(map[string][]string, len(ibb)) for _, ib := range ibb { - bindings[ib.GetSubject().GetName()] = ib.GetRules() + sr.storeIndexRuleBinding(ib) if ib.Metadata.ModRevision > revCtx.indexRuleBinding { revCtx.indexRuleBinding = ib.Metadata.ModRevision } } - sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(bindings)).Msg("get index rule bindings") - return bindings + sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ibb)).Msg("get index rule bindings") } -func (sr *schemaRepo) getRules(ctx context.Context, gName string) map[string]*databasev1.IndexRule { +func (sr *schemaRepo) processRules(ctx context.Context, gName string) { ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() start := time.Now() @@ -146,26 +126,16 @@ func (sr *schemaRepo) getRules(ctx context.Context, gName string) map[string]*da logger.Panicf("fails to get the index rules: %v", err) } revCtx := ctx.Value(revCtxKey).(*revisionContext) - rules := make(map[string]*databasev1.IndexRule, len(rr)) for _, r := range rr { - rules[r.Metadata.Name] = r + sr.storeIndexRule(r) if r.Metadata.ModRevision > revCtx.indexRule { revCtx.indexRule = r.Metadata.ModRevision } } - sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(rules)).Msg("get index rules") - return rules + sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(rr)).Msg("get index rules") } -func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group *group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) { - aggMap := sr.getAggMap(ctx, gName) - if len(aggMap) > 0 { - if err := createTopNResultMeasure(ctx, sr.metadata.MeasureRegistry(), gName); err != nil { - logger.Panicf("fails to create the topN result measure: %v", err) - return - } - } - +func (sr *schemaRepo) processMeasure(ctx context.Context, gName string) { ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() start := time.Now() @@ -180,58 +150,14 @@ func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group *g if m.Metadata.ModRevision > revCtx.measure { revCtx.measure = m.Metadata.ModRevision } - sr.storeMeasure(m, group, bindings, rules, aggMap) - } - sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(mm)).Msg("store measures") -} - -func (sr *schemaRepo) getAggMap(ctx context.Context, gName string) map[string][]*databasev1.TopNAggregation { - ctx, cancel := context.WithTimeout(ctx, initTimeout) - defer cancel() - start := time.Now() - agg, err := sr.metadata.TopNAggregationRegistry().ListTopNAggregation(ctx, schema.ListOpt{Group: gName}) - if err != nil { - logger.Panicf("fails to get the topN aggregations: %v", err) - } - revCtx := ctx.Value(revCtxKey).(*revisionContext) - aggMap := make(map[string][]*databasev1.TopNAggregation, len(agg)) - for _, a := range agg { - aggs, ok := aggMap[a.SourceMeasure.Name] - if ok { - aggs = append(aggs, a) - } else { - aggs = []*databasev1.TopNAggregation{a} - } - aggMap[a.SourceMeasure.Name] = aggs - if a.Metadata.ModRevision > revCtx.topNAgg { - revCtx.topNAgg = a.Metadata.ModRevision - } - } - sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(agg)).Msg("get topN aggregations") - return aggMap -} - -func (sr *schemaRepo) storeMeasure(m *databasev1.Measure, group *group, bindings map[string][]string, - rules map[string]*databasev1.IndexRule, aggMap map[string][]*databasev1.TopNAggregation, -) { - var indexRules []*databasev1.IndexRule - if rr, ok := bindings[m.Metadata.GetName()]; ok { - for _, r := range rr { - if rule, ok := rules[r]; ok { - indexRules = append(indexRules, rule) - } + if err := sr.storeResource(m); err != nil { + logger.Panicf("fails to store the measure: %v", err) } } - var topNAggr []*databasev1.TopNAggregation - if aa, ok := aggMap[m.Metadata.GetName()]; ok { - topNAggr = append(topNAggr, aa...) - } - if err := sr.storeResource(group, m, indexRules, topNAggr); err != nil { - logger.Panicf("fails to store the measure: %v", err) - } + sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(mm)).Msg("store measures") } -func (sr *schemaRepo) processStream(ctx context.Context, gName string, group *group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) { +func (sr *schemaRepo) processStream(ctx context.Context, gName string) { ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() start := time.Now() @@ -242,7 +168,9 @@ func (sr *schemaRepo) processStream(ctx context.Context, gName string, group *gr } revCtx := ctx.Value(revCtxKey).(*revisionContext) for _, s := range ss { - sr.storeStream(s, group, bindings, rules) + if err := sr.storeResource(s); err != nil { + logger.Panicf("fails to store the stream: %v", err) + } if s.Metadata.ModRevision > revCtx.stream { revCtx.stream = s.Metadata.ModRevision } @@ -250,20 +178,6 @@ func (sr *schemaRepo) processStream(ctx context.Context, gName string, group *gr sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ss)).Msg("store streams") } -func (sr *schemaRepo) storeStream(s *databasev1.Stream, group *group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) { - var indexRules []*databasev1.IndexRule - if rr, ok := bindings[s.Metadata.GetName()]; ok { - for _, r := range rr { - if rule, ok := rules[r]; ok { - indexRules = append(indexRules, rule) - } - } - } - if err := sr.storeResource(group, s, indexRules, nil); err != nil { - logger.Panicf("fails to store the stream: %v", err) - } -} - func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { g, ok := sr.getGroup(groupSchema.Metadata.Name) if ok { @@ -276,51 +190,3 @@ func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { } return g, nil } - -func createTopNResultMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, group string) error { - md := GetTopNSchemaMetadata(group) - m, err := measureSchemaRegistry.GetMeasure(ctx, md) - if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) { - return errors.WithMessagef(err, "fail to get %s", md) - } - if m != nil { - return nil - } - - m = GetTopNSchema(md) - if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr != nil { - if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) { - return errors.WithMessagef(innerErr, "fail to create new topN measure %s", m) - } - } - return nil -} - -// GetTopNSchema returns the schema of the topN result measure. -func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure { - return &databasev1.Measure{ - Metadata: md, - TagFamilies: []*databasev1.TagFamilySpec{ - { - Name: TopNTagFamily, - Tags: []*databasev1.TagSpec{ - {Name: TopNTagNames[0], Type: databasev1.TagType_TAG_TYPE_STRING}, - {Name: TopNTagNames[1], Type: databasev1.TagType_TAG_TYPE_INT}, - {Name: TopNTagNames[2], Type: databasev1.TagType_TAG_TYPE_STRING}, - }, - }, - }, - Fields: topNFieldsSpec, - Entity: &databasev1.Entity{ - TagNames: TopNTagNames, - }, - } -} - -// GetTopNSchemaMetadata returns the metadata of the topN result measure. -func GetTopNSchemaMetadata(group string) *commonv1.Metadata { - return &commonv1.Metadata{ - Name: TopNSchemaName, - Group: group, - } -} diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go index 9415539d..50c455b9 100644 --- a/pkg/schema/schema.go +++ b/pkg/schema/schema.go @@ -26,9 +26,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) -// TopNSchemaName is the name of the top n result schema. -const TopNSchemaName = "_top_n_result" - // EventType defines actions of events. type EventType uint8 @@ -46,7 +43,8 @@ type EventKind uint8 const ( EventKindGroup EventKind = iota EventKindResource - EventKindTopNAgg + EventKindIndexRule + EventKindIndexRuleBinding ) // Group is the root node, allowing get resources from its sub nodes. @@ -69,11 +67,8 @@ type ResourceSchema interface { // Resource allows access metadata from a local cache. type Resource interface { - io.Closer - IndexRules() []*databasev1.IndexRule - TopN() []*databasev1.TopNAggregation Schema() ResourceSchema - Delegated() io.Closer + Delegated() IndexListener } // Supplier allows open a tsdb. @@ -81,10 +76,15 @@ type Supplier interface { SupplyTSDB() io.Closer } +// IndexListener listens to the index update. +type IndexListener interface { + OnIndexUpdate(index []*databasev1.IndexRule) +} + // ResourceSchemaSupplier allows get a ResourceSchema from the metadata. type ResourceSchemaSupplier interface { ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error) - OpenResource(shardNum uint32, supplier Supplier, spec Resource) (io.Closer, error) + OpenResource(spec Resource) (IndexListener, error) } // ResourceSupplier allows open a resource and its embedded tsdb. @@ -102,7 +102,7 @@ type DB interface { // Repository is the collection of several hierarchies groups by a "Group". type Repository interface { Watcher() - Init(schema.Kind) []int64 + Init(schema.Kind) ([]string, []int64) SendMetadataEvent(MetadataEvent) LoadGroup(name string) (Group, bool) LoadResource(metadata *commonv1.Metadata) (Resource, bool) diff --git a/scripts/ci/check/version_test.go b/scripts/ci/check/version_test.go index 26555a59..a5855cb5 100644 --- a/scripts/ci/check/version_test.go +++ b/scripts/ci/check/version_test.go @@ -31,7 +31,7 @@ import ( ) const ( - GoVersion = "1.23.0" + GoVersion = "1.23.5" CPUType = 8 ) diff --git a/test/integration/standalone/cold_query/query_suite_test.go b/test/integration/standalone/cold_query/query_suite_test.go index a28c145e..36f03901 100644 --- a/test/integration/standalone/cold_query/query_suite_test.go +++ b/test/integration/standalone/cold_query/query_suite_test.go @@ -89,7 +89,9 @@ var _ = SynchronizedAfterSuite(func() { Expect(connection.Close()).To(Succeed()) } }, func() { - deferFunc() + if deferFunc != nil { + deferFunc() + } Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index 765cd893..664f4071 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -90,7 +90,9 @@ var _ = SynchronizedAfterSuite(func() { Expect(connection.Close()).To(Succeed()) } }, func() { - deferFunc() + if deferFunc != nil { + deferFunc() + } Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) })