This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b63ff71 Periodic sync to sync the metadata from the etcd (#600)
4b63ff71 is described below

commit 4b63ff7183e2165714a329683a64d2af72e384d1
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Feb 2 13:20:15 2025 +0800

    Periodic sync to sync the metadata from the etcd (#600)
---
 .github/workflows/e2e.storage.yml                  |  11 +
 CHANGES.md                                         |   1 +
 banyand/measure/block.go                           |  14 +-
 banyand/measure/measure.go                         | 103 +++-----
 banyand/measure/metadata.go                        | 285 +++++++++++++--------
 banyand/measure/query.go                           |  48 ++--
 banyand/measure/topn.go                            | 235 ++++++++++++-----
 banyand/measure/write.go                           |  19 +-
 banyand/metadata/client.go                         |  28 +-
 banyand/metadata/embeddedserver/server.go          |   1 +
 banyand/metadata/schema/etcd.go                    | 109 ++++++--
 banyand/metadata/schema/schema.go                  |   7 +-
 banyand/metadata/schema/watcher.go                 | 257 +++++++++++++------
 banyand/metadata/schema/watcher_test.go            | 247 +++++++++++++++++-
 banyand/query/processor_topn.go                    |   3 +-
 banyand/stream/benchmark_test.go                   |   7 +-
 banyand/stream/block.go                            |  11 +-
 banyand/stream/block_scanner.go                    |   2 +
 banyand/stream/metadata.go                         | 130 ++++------
 banyand/stream/query.go                            |  25 +-
 banyand/stream/query_by_idx.go                     |   2 +-
 banyand/stream/query_by_ts.go                      |   9 +-
 banyand/stream/stream.go                           |  80 +++---
 banyand/stream/write.go                            |  10 +-
 bydbctl/internal/cmd/measure_test.go               |   3 +-
 go.mod                                             |   4 +-
 pkg/partition/index.go                             |   6 +-
 pkg/query/logical/measure/topn_analyzer.go         |   6 +-
 pkg/query/logical/measure/topn_plan_localscan.go   |  11 +-
 .../logical/stream/stream_plan_indexscan_local.go  |  23 +-
 pkg/query/model/model.go                           |  44 ++++
 pkg/query/model/model_test.go                      |   6 +-
 pkg/schema/cache.go                                | 264 ++++++++++---------
 pkg/schema/init.go                                 | 188 ++------------
 pkg/schema/schema.go                               |  20 +-
 scripts/ci/check/version_test.go                   |   2 +-
 .../standalone/cold_query/query_suite_test.go      |   4 +-
 .../standalone/query/query_suite_test.go           |   4 +-
 38 files changed, 1386 insertions(+), 843 deletions(-)

diff --git a/.github/workflows/e2e.storage.yml 
b/.github/workflows/e2e.storage.yml
index c6fd07e2..b815ffac 100644
--- a/.github/workflows/e2e.storage.yml
+++ b/.github/workflows/e2e.storage.yml
@@ -96,6 +96,17 @@ jobs:
         uses: 
apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
         with:
           e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
+      - if: ${{ failure() }}
+        run: |
+          df -h
+          du -sh .
+          docker images
+      - uses: actions/upload-artifact@v4
+        if: ${{ failure() }}
+        name: Upload Logs
+        with:
+          name: test-logs-${{ matrix.test.name }}
+          path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
 
   Storage:
     if: (github.event_name == 'schedule' && github.repository == 
'apache/skywalking-banyandb') || (github.event_name != 'schedule')
diff --git a/CHANGES.md b/CHANGES.md
index 34656715..1db15cdf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
 - Metadata: Wait for the existing registration to be removed before 
registering the node.
 - Stream: Introduce the batch scan to improve the performance of the query and 
limit the memory usage.
 - Add memory protector to protect the memory usage of the system. It will 
limit the memory usage of the querying.
+- Metadata: Introduce the periodic sync to sync the metadata from the etcd to 
the local cache in case of the loss of the events.
 
 ### Bug Fixes
 
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 37ce7a3b..80f6ca00 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -692,24 +692,24 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        tmpBlock.reset()
        cfm := make([]columnMetadata, 0, len(bc.fieldProjection))
 NEXT_FIELD:
-       for j := range bc.fieldProjection {
-               for i := range bc.bm.field.columnMetadata {
-                       if bc.bm.field.columnMetadata[i].name == 
bc.fieldProjection[j] {
-                               cfm = append(cfm, bc.bm.field.columnMetadata[i])
+       for _, fp := range bc.fieldProjection {
+               for _, cm := range bc.bm.field.columnMetadata {
+                       if cm.name == fp {
+                               cfm = append(cfm, cm)
                                continue NEXT_FIELD
                        }
                }
                cfm = append(cfm, columnMetadata{
-                       name:      bc.fieldProjection[j],
+                       name:      fp,
                        valueType: pbv1.ValueTypeUnknown,
                })
        }
        bc.bm.field.columnMetadata = cfm
        bc.bm.tagProjection = bc.tagProjection
        var tf map[string]*dataBlock
-       for i := range bc.tagProjection {
+       for _, tp := range bc.tagProjection {
                for tfName, block := range bc.bm.tagFamilies {
-                       if bc.tagProjection[i].Family == tfName {
+                       if tp.Family == tfName {
                                if tf == nil {
                                        tf = make(map[string]*dataBlock, 
len(bc.tagProjection))
                                }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 797bf83c..3edc7127 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -21,17 +21,14 @@
 package measure
 
 import (
+       "sync/atomic"
        "time"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/protector"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
-       "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -52,40 +49,33 @@ type option struct {
        seriesCacheMaxSize run.Bytes
 }
 
-type measure struct {
-       databaseSupplier   schema.Supplier
-       pm                 *protector.Memory
+type indexSchema struct {
        indexTagMap        map[string]struct{}
-       l                  *logger.Logger
-       schema             *databasev1.Measure
-       processorManager   *topNProcessorManager
        fieldIndexLocation partition.FieldIndexLocation
-       name               string
-       group              string
        indexRuleLocators  partition.IndexRuleLocator
        indexRules         []*databasev1.IndexRule
-       topNAggregations   []*databasev1.TopNAggregation
-       interval           time.Duration
-       shardNum           uint32
 }
 
-func (s *measure) startSteamingManager(pipeline queue.Queue) error {
-       if len(s.topNAggregations) == 0 {
-               return nil
-       }
-       tagMapSpec := logical.TagSpecMap{}
-       tagMapSpec.RegisterTagFamilies(s.schema.GetTagFamilies())
-
-       s.processorManager = &topNProcessorManager{
-               l:            s.l,
-               pipeline:     pipeline,
-               m:            s,
-               s:            tagMapSpec,
-               topNSchemas:  s.topNAggregations,
-               processorMap: 
make(map[*commonv1.Metadata][]*topNStreamingProcessor),
+func (i *indexSchema) parse(schema *databasev1.Measure) {
+       i.indexRuleLocators, i.fieldIndexLocation = 
partition.ParseIndexRuleLocators(schema.GetEntity(), schema.GetTagFamilies(), 
i.indexRules, schema.IndexMode)
+       i.indexTagMap = make(map[string]struct{})
+       for j := range i.indexRules {
+               for k := range i.indexRules[j].Tags {
+                       i.indexTagMap[i.indexRules[j].Tags[k]] = struct{}{}
+               }
        }
+}
 
-       return s.processorManager.start()
+type measure struct {
+       indexSchema atomic.Value
+       tsdb        atomic.Value
+       pm          *protector.Memory
+       l           *logger.Logger
+       schema      *databasev1.Measure
+       schemaRepo  *schemaRepo
+       name        string
+       group       string
+       interval    time.Duration
 }
 
 func (s *measure) GetSchema() *databasev1.Measure {
@@ -93,14 +83,18 @@ func (s *measure) GetSchema() *databasev1.Measure {
 }
 
 func (s *measure) GetIndexRules() []*databasev1.IndexRule {
-       return s.indexRules
-}
-
-func (s *measure) Close() error {
-       if s.processorManager == nil {
+       is := s.indexSchema.Load()
+       if is == nil {
                return nil
        }
-       return s.processorManager.Close()
+       return is.(indexSchema).indexRules
+}
+
+func (s *measure) OnIndexUpdate(index []*databasev1.IndexRule) {
+       var is indexSchema
+       is.indexRules = index
+       is.parse(s.schema)
+       s.indexSchema.Store(is)
 }
 
 func (s *measure) parseSpec() (err error) {
@@ -108,44 +102,27 @@ func (s *measure) parseSpec() (err error) {
        if s.schema.Interval != "" {
                s.interval, err = timestamp.ParseDuration(s.schema.Interval)
        }
-       s.indexRuleLocators, s.fieldIndexLocation = 
partition.ParseIndexRuleLocators(s.schema.GetEntity(), 
s.schema.GetTagFamilies(), s.indexRules, s.schema.IndexMode)
-       s.indexTagMap = make(map[string]struct{})
-       for j := range s.indexRules {
-               for k := range s.indexRules[j].Tags {
-                       s.indexTagMap[s.indexRules[j].Tags[k]] = struct{}{}
-               }
-       }
+       var is indexSchema
+       is.parse(s.schema)
+       s.indexSchema.Store(is)
        return err
 }
 
 type measureSpec struct {
-       schema           *databasev1.Measure
-       indexRules       []*databasev1.IndexRule
-       topNAggregations []*databasev1.TopNAggregation
+       schema *databasev1.Measure
 }
 
-func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec,
-       l *logger.Logger, pipeline queue.Queue, pm *protector.Memory,
+func openMeasure(spec measureSpec,
+       l *logger.Logger, pm *protector.Memory, schemaRepo *schemaRepo,
 ) (*measure, error) {
        m := &measure{
-               shardNum:         shardNum,
-               schema:           spec.schema,
-               indexRules:       spec.indexRules,
-               topNAggregations: spec.topNAggregations,
-               l:                l,
-               pm:               pm,
+               schema:     spec.schema,
+               l:          l,
+               pm:         pm,
+               schemaRepo: schemaRepo,
        }
        if err := m.parseSpec(); err != nil {
                return nil, err
        }
-       if db == nil {
-               return m, nil
-       }
-
-       m.databaseSupplier = db
-       if startErr := m.startSteamingManager(pipeline); startErr != nil {
-               l.Err(startErr).Str("measure", 
spec.schema.GetMetadata().GetName()).
-                       Msg("fail to start streaming manager")
-       }
        return m, nil
 }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 229aa371..b5b8f1cf 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -22,9 +22,12 @@ import (
        "fmt"
        "io"
        "path"
+       "sync"
        "time"
 
+       "github.com/cenkalti/backoff/v4"
        "github.com/pkg/errors"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -40,7 +43,27 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
-var metadataScope = measureScope.SubScope("metadata")
+const (
+       // TopNSchemaName is the name of the top n result schema.
+       TopNSchemaName = "_top_n_result"
+       // TopNTagFamily is the tag family name of the topN result measure.
+       TopNTagFamily = "_topN"
+       // TopNFieldName is the field name of the topN result measure.
+       TopNFieldName = "value"
+)
+
+var (
+       metadataScope = measureScope.SubScope("metadata")
+
+       topNFieldsSpec = []*databasev1.FieldSpec{{
+               Name:              TopNFieldName,
+               FieldType:         databasev1.FieldType_FIELD_TYPE_DATA_BINARY,
+               EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+               CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+       }}
+       // TopNTagNames is the tag names of the topN result measure.
+       TopNTagNames = []string{"name", "direction", "group"}
+)
 
 // SchemaService allows querying schema information.
 type SchemaService interface {
@@ -49,9 +72,11 @@ type SchemaService interface {
 }
 type schemaRepo struct {
        resourceSchema.Repository
-       l        *logger.Logger
-       metadata metadata.Repo
-       path     string
+       metadata         metadata.Repo
+       pipeline         queue.Queue
+       l                *logger.Logger
+       topNProcessorMap sync.Map
+       path             string
 }
 
 func newSchemaRepo(path string, svc *service) *schemaRepo {
@@ -59,13 +84,14 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
                path:     path,
                l:        svc.l,
                metadata: svc.metadata,
-               Repository: resourceSchema.NewRepository(
-                       svc.metadata,
-                       svc.l,
-                       newSupplier(path, svc),
-                       resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
-               ),
+               pipeline: svc.localPipeline,
        }
+       sr.Repository = resourceSchema.NewRepository(
+               svc.metadata,
+               svc.l,
+               newSupplier(path, svc, sr),
+               resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
+       )
        sr.start()
        return sr
 }
@@ -106,7 +132,11 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, 
[]int64) {
                logger.Panicf("unexpected kinds: %v", kinds)
                return false, nil
        }
-       return true, sr.Repository.Init(schema.KindMeasure)
+       groupNames, revs := sr.Repository.Init(schema.KindMeasure)
+       for i := range groupNames {
+               sr.createTopNResultMeasure(context.Background(), 
sr.metadata.MeasureRegistry(), groupNames[i])
+       }
+       return true, revs
 }
 
 func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
@@ -125,70 +155,52 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata 
schema.Metadata) {
                        Kind:     resourceSchema.EventKindGroup,
                        Metadata: g,
                })
+               sr.createTopNResultMeasure(context.Background(), 
sr.metadata.MeasureRegistry(), g.Metadata.Name)
        case schema.KindMeasure:
-               if err := 
validate.Measure(metadata.Spec.(*databasev1.Measure)); err != nil {
+               m := metadata.Spec.(*databasev1.Measure)
+               if err := validate.Measure(m); err != nil {
                        sr.l.Warn().Err(err).Msg("measure is ignored")
                        return
                }
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                        Typ:      resourceSchema.EventAddOrUpdate,
                        Kind:     resourceSchema.EventKindResource,
-                       Metadata: metadata.Spec.(*databasev1.Measure),
+                       Metadata: m,
                })
        case schema.KindIndexRuleBinding:
-               irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding)
-               if !ok {
-                       sr.l.Warn().Msg("fail to convert message to 
IndexRuleBinding")
-                       return
-               }
-               if err := validate.IndexRuleBinding(irb); err != nil {
-                       sr.l.Warn().Err(err).Msg("index rule binding is 
ignored")
-                       return
-               }
-               if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_MEASURE 
{
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       stm, err := 
sr.metadata.MeasureRegistry().GetMeasure(ctx, &commonv1.Metadata{
-                               Name:  irb.GetSubject().GetName(),
-                               Group: metadata.Group,
-                       })
-                       cancel()
-                       if err != nil {
+               if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
+                       if err := validate.IndexRuleBinding(irb); err != nil {
+                               sr.l.Warn().Err(err).Msg("index rule binding is 
ignored")
                                return
                        }
-                       sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                               Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: stm,
-                       })
+                       if irb.GetSubject().Catalog == 
commonv1.Catalog_CATALOG_MEASURE {
+                               
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                                       Typ:      
resourceSchema.EventAddOrUpdate,
+                                       Kind:     
resourceSchema.EventKindIndexRuleBinding,
+                                       Metadata: irb,
+                               })
+                       }
                }
        case schema.KindIndexRule:
-               if err := 
validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
-                       sr.l.Warn().Err(err).Msg("index rule is ignored")
-                       return
-               }
-               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
-               defer cancel()
-               subjects, err := sr.metadata.Subjects(ctx, 
metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
-               if err != nil {
-                       return
-               }
-               for _, sub := range subjects {
+               if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok {
+                       if err := 
validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
+                               sr.l.Warn().Err(err).Msg("index rule is 
ignored")
+                               return
+                       }
                        sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                                Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: sub.(*databasev1.Measure),
+                               Kind:     resourceSchema.EventKindIndexRule,
+                               Metadata: ir,
                        })
                }
        case schema.KindTopNAggregation:
-               if err := 
validate.TopNAggregation(metadata.Spec.(*databasev1.TopNAggregation)); err != 
nil {
+               topNSchema := metadata.Spec.(*databasev1.TopNAggregation)
+               if err := validate.TopNAggregation(topNSchema); err != nil {
                        sr.l.Warn().Err(err).Msg("topNAggregation is ignored")
                        return
                }
-               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                       Typ:      resourceSchema.EventAddOrUpdate,
-                       Kind:     resourceSchema.EventKindTopNAgg,
-                       Metadata: metadata.Spec.(*databasev1.TopNAggregation),
-               })
+               manager := sr.getSteamingManager(topNSchema.SourceMeasure, 
sr.pipeline)
+               manager.register(topNSchema)
        default:
        }
 }
@@ -206,40 +218,52 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                        Metadata: g,
                })
        case schema.KindMeasure:
+               m := metadata.Spec.(*databasev1.Measure)
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                        Typ:      resourceSchema.EventDelete,
                        Kind:     resourceSchema.EventKindResource,
-                       Metadata: metadata.Spec.(*databasev1.Measure),
+                       Metadata: m,
                })
+               sr.stopSteamingManager(m.GetMetadata())
        case schema.KindIndexRuleBinding:
-               if 
metadata.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == 
commonv1.Catalog_CATALOG_MEASURE {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       defer cancel()
-                       m, err := sr.metadata.MeasureRegistry().GetMeasure(ctx, 
&commonv1.Metadata{
-                               Name:  metadata.Name,
-                               Group: metadata.Group,
-                       })
-                       if err != nil {
-                               return
+               if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); 
ok {
+                       if binding.GetSubject().Catalog == 
commonv1.Catalog_CATALOG_MEASURE {
+                               
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                                       Typ:      resourceSchema.EventDelete,
+                                       Kind:     
resourceSchema.EventKindIndexRuleBinding,
+                                       Metadata: 
metadata.Spec.(*databasev1.IndexRuleBinding),
+                               })
                        }
-                       // we should update instead of delete
+               }
+
+       case schema.KindIndexRule:
+               if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok {
                        sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                               Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: m,
+                               Typ:      resourceSchema.EventDelete,
+                               Kind:     resourceSchema.EventKindIndexRule,
+                               Metadata: rule,
                        })
                }
-       case schema.KindIndexRule:
        case schema.KindTopNAggregation:
-               sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                       Typ:      resourceSchema.EventAddOrUpdate,
-                       Kind:     resourceSchema.EventKindTopNAgg,
-                       Metadata: metadata.Spec.(*databasev1.TopNAggregation),
-               })
+               topNAggregation := metadata.Spec.(*databasev1.TopNAggregation)
+               sr.stopSteamingManager(topNAggregation.SourceMeasure)
        default:
        }
 }
 
+func (sr *schemaRepo) Close() {
+       var err error
+       sr.topNProcessorMap.Range(func(_, val any) bool {
+               manager := val.(*topNProcessorManager)
+               err = multierr.Append(err, manager.Close())
+               return true
+       })
+       if err != nil {
+               sr.l.Error().Err(err).Msg("faced error when closing schema 
repository")
+       }
+       sr.Repository.Close()
+}
+
 func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, 
bool) {
        r, ok := sr.LoadResource(metadata)
        if !ok {
@@ -250,6 +274,9 @@ func (sr *schemaRepo) loadMeasure(metadata 
*commonv1.Metadata) (*measure, bool)
 }
 
 func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, 
option], error) {
+       if sr == nil {
+               return nil, fmt.Errorf("schemaRepo is nil")
+       }
        g, ok := sr.LoadGroup(groupName)
        if !ok {
                return nil, fmt.Errorf("group %s not found", groupName)
@@ -261,37 +288,64 @@ func (sr *schemaRepo) loadTSDB(groupName string) 
(storage.TSDB[*tsTable, option]
        return db.(storage.TSDB[*tsTable, option]), nil
 }
 
+func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, 
measureSchemaRegistry schema.Measure, group string) {
+       md := GetTopNSchemaMetadata(group)
+       operation := func() error {
+               m, err := measureSchemaRegistry.GetMeasure(ctx, md)
+               if err != nil && !errors.Is(err, 
schema.ErrGRPCResourceNotFound) {
+                       return errors.WithMessagef(err, "fail to get %s", md)
+               }
+               if m != nil {
+                       return nil
+               }
+
+               m = GetTopNSchema(md)
+               if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); 
innerErr != nil {
+                       if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
+                               return errors.WithMessagef(innerErr, "fail to 
create new topN measure %s", m)
+                       }
+               }
+               return nil
+       }
+
+       backoffStrategy := backoff.NewExponentialBackOff()
+       backoffStrategy.MaxElapsedTime = 2 * time.Minute
+
+       err := backoff.Retry(operation, backoffStrategy)
+       if err != nil {
+               logger.Panicf("fail to create topN measure %s: %v", md, err)
+       }
+}
+
 var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
-       metadata metadata.Repo
-       pipeline queue.Queue
-       omr      observability.MetricsRegistry
-       l        *logger.Logger
-       pm       *protector.Memory
-       path     string
-       option   option
+       metadata   metadata.Repo
+       omr        observability.MetricsRegistry
+       l          *logger.Logger
+       pm         *protector.Memory
+       schemaRepo *schemaRepo
+       path       string
+       option     option
 }
 
-func newSupplier(path string, svc *service) *supplier {
+func newSupplier(path string, svc *service, sr *schemaRepo) *supplier {
        return &supplier{
-               path:     path,
-               metadata: svc.metadata,
-               l:        svc.l,
-               pipeline: svc.localPipeline,
-               option:   svc.option,
-               omr:      svc.omr,
-               pm:       svc.pm,
+               path:       path,
+               metadata:   svc.metadata,
+               l:          svc.l,
+               option:     svc.option,
+               omr:        svc.omr,
+               pm:         svc.pm,
+               schemaRepo: sr,
        }
 }
 
-func (s *supplier) OpenResource(shardNum uint32, supplier 
resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) {
+func (s *supplier) OpenResource(spec resourceSchema.Resource) 
(resourceSchema.IndexListener, error) {
        measureSchema := spec.Schema().(*databasev1.Measure)
-       return openMeasure(shardNum, supplier, measureSpec{
-               schema:           measureSchema,
-               indexRules:       spec.IndexRules(),
-               topNAggregations: spec.TopN(),
-       }, s.l, s.pipeline, s.pm)
+       return openMeasure(measureSpec{
+               schema: measureSchema,
+       }, s.l, s.pm, s.schemaRepo)
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
@@ -348,11 +402,42 @@ func (*portableSupplier) OpenDB(_ *commonv1.Group) 
(io.Closer, error) {
        panic("do not support open db")
 }
 
-func (s *portableSupplier) OpenResource(shardNum uint32, _ 
resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) {
+func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) 
(resourceSchema.IndexListener, error) {
        measureSchema := spec.Schema().(*databasev1.Measure)
-       return openMeasure(shardNum, nil, measureSpec{
-               schema:           measureSchema,
-               indexRules:       spec.IndexRules(),
-               topNAggregations: spec.TopN(),
+       return openMeasure(measureSpec{
+               schema: measureSchema,
        }, s.l, nil, nil)
 }
+
+// GetTopNSchema returns the schema of the topN result measure.
+func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: md,
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {
+                               Name: TopNTagFamily,
+                               Tags: []*databasev1.TagSpec{
+                                       {Name: TopNTagNames[0], Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: TopNTagNames[1], Type: 
databasev1.TagType_TAG_TYPE_INT},
+                                       {Name: TopNTagNames[2], Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               },
+                       },
+               },
+               Fields: topNFieldsSpec,
+               Entity: &databasev1.Entity{
+                       TagNames: TopNTagNames,
+               },
+       }
+}
+
+// GetTopNSchemaMetadata returns the metadata of the topN result measure.
+func GetTopNSchemaMetadata(group string) *commonv1.Metadata {
+       return &commonv1.Metadata{
+               Name:  TopNSchemaName,
+               Group: group,
+       }
+}
+
+func getKey(metadata *commonv1.Metadata) string {
+       return path.Join(metadata.GetGroup(), metadata.GetName())
+}
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 4529c298..ab033187 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -22,7 +22,6 @@ import (
        "container/heap"
        "context"
        "fmt"
-       "io"
        "sort"
 
        "github.com/pkg/errors"
@@ -56,7 +55,6 @@ type Query interface {
 
 // Measure allows inspecting measure data points' details.
 type Measure interface {
-       io.Closer
        Query(ctx context.Context, opts model.MeasureQueryOptions) 
(model.MeasureQueryResult, error)
        GetSchema() *databasev1.Measure
        GetIndexRules() []*databasev1.IndexRule
@@ -77,12 +75,18 @@ func (s *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
                return nil, errors.New("invalid query options: tagProjection or 
fieldProjection is required")
        }
-       db := s.databaseSupplier.SupplyTSDB()
+       var tsdb storage.TSDB[*tsTable, option]
+       db := s.tsdb.Load()
        if db == nil {
-               return mqr, nil
+               tsdb, err = s.schemaRepo.loadTSDB(s.group)
+               if err != nil {
+                       return nil, err
+               }
+               s.tsdb.Store(tsdb)
+       } else {
+               tsdb = db.(storage.TSDB[*tsTable, option])
        }
 
-       tsdb := db.(storage.TSDB[*tsTable, option])
        segments := tsdb.SelectSegments(*mqo.TimeRange)
        if len(segments) < 1 {
                return nilResult, nil
@@ -184,6 +188,7 @@ func (s *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
        fieldToValueType := make(map[string]tagNameWithType)
        var projectedEntityOffsets map[string]int
        newTagProjection = make([]model.TagProjection, 0)
+       is := s.indexSchema.Load().(indexSchema)
        for _, tp := range mqo.TagProjection {
                var tagProjection model.TagProjection
        TAG:
@@ -197,14 +202,16 @@ func (s *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                                        continue TAG
                                }
                        }
-                       if fields, ok := s.fieldIndexLocation[tp.Family]; ok {
-                               if field, ok := fields[n]; ok {
-                                       indexProjection = 
append(indexProjection, field.Key)
-                                       fieldToValueType[field.Key.Marshal()] = 
tagNameWithType{
-                                               fieldName: n,
-                                               typ:       field.Type,
+                       if is.fieldIndexLocation != nil {
+                               if fields, ok := 
is.fieldIndexLocation[tp.Family]; ok {
+                                       if field, ok := fields[n]; ok {
+                                               indexProjection = 
append(indexProjection, field.Key)
+                                               
fieldToValueType[field.Key.Marshal()] = tagNameWithType{
+                                                       fieldName: n,
+                                                       typ:       field.Type,
+                                               }
+                                               continue TAG
                                        }
-                                       continue TAG
                                }
                        }
                        tagProjection.Family = tp.Family
@@ -268,6 +275,7 @@ func (s *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
                        segments[i].DecRef()
                }
        }()
+       is := s.indexSchema.Load().(indexSchema)
        r := &indexSortResult{}
        var indexProjection []index.FieldKey
        for _, tp := range mqo.TagProjection {
@@ -285,14 +293,16 @@ func (s *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
                                        continue TAG
                                }
                        }
-                       if fields, ok := s.fieldIndexLocation[tp.Family]; ok {
-                               if field, ok := fields[n]; ok {
-                                       indexProjection = 
append(indexProjection, field.Key)
-                                       tagFamilyLocation.fieldToValueType[n] = 
tagNameWithType{
-                                               fieldName: field.Key.Marshal(),
-                                               typ:       field.Type,
+                       if is.fieldIndexLocation != nil {
+                               if fields, ok := 
is.fieldIndexLocation[tp.Family]; ok {
+                                       if field, ok := fields[n]; ok {
+                                               indexProjection = 
append(indexProjection, field.Key)
+                                               
tagFamilyLocation.fieldToValueType[n] = tagNameWithType{
+                                                       fieldName: 
field.Key.Marshal(),
+                                                       typ:       field.Type,
+                                               }
+                                               continue TAG
                                        }
-                                       continue TAG
                                }
                        }
                        return nil, fmt.Errorf("tag %s not found in schema", n)
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 4170aef6..d566d561 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -50,7 +50,6 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
-       "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 const (
@@ -65,6 +64,51 @@ var (
        _ flow.Sink = (*topNStreamingProcessor)(nil)
 )
 
+func (sr *schemaRepo) getSteamingManager(source *commonv1.Metadata, pipeline 
queue.Queue) (manager *topNProcessorManager) {
+       key := getKey(source)
+       sourceMeasure, ok := sr.loadMeasure(source)
+       if !ok {
+               m, _ := sr.topNProcessorMap.LoadOrStore(key, 
&topNProcessorManager{
+                       l:        sr.l,
+                       pipeline: pipeline,
+               })
+               manager = m.(*topNProcessorManager)
+               return manager
+       }
+
+       if v, ok := sr.topNProcessorMap.Load(key); ok {
+               pre := v.(*topNProcessorManager)
+               pre.init(sourceMeasure)
+               if pre.m.schema.GetMetadata().GetModRevision() < 
sourceMeasure.schema.GetMetadata().GetModRevision() {
+                       defer pre.Close()
+                       manager = &topNProcessorManager{
+                               l:        sr.l,
+                               pipeline: pipeline,
+                       }
+                       manager.registeredTasks = 
append(manager.registeredTasks, pre.registeredTasks...)
+               } else {
+                       return pre
+               }
+       }
+       if manager == nil {
+               manager = &topNProcessorManager{
+                       l:        sr.l,
+                       pipeline: pipeline,
+               }
+       }
+       manager.init(sourceMeasure)
+       sr.topNProcessorMap.Store(key, manager)
+       return manager
+}
+
+func (sr *schemaRepo) stopSteamingManager(sourceMeasure *commonv1.Metadata) {
+       key := getKey(sourceMeasure)
+       if v, ok := sr.topNProcessorMap.Load(key); ok {
+               v.(*topNProcessorManager).Close()
+               sr.topNProcessorMap.Delete(key)
+       }
+}
+
 type dataPointWithEntityValues struct {
        *measurev1.DataPointValue
        entityValues []*modelv1.TagValue
@@ -199,7 +243,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord) err
                iwr := &measurev1.InternalWriteRequest{
                        Request: &measurev1.WriteRequest{
                                MessageId: uint64(time.Now().UnixNano()),
-                               Metadata:  &commonv1.Metadata{Name: 
schema.TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
+                               Metadata:  &commonv1.Metadata{Name: 
TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
                                DataPoint: &measurev1.DataPointValue{
                                        Timestamp: timestamppb.New(eventTime),
                                        TagFamilies: 
[]*modelv1.TagFamilyForWrite{
@@ -270,94 +314,155 @@ func (t *topNStreamingProcessor) handleError() {
 
 // topNProcessorManager manages multiple topNStreamingProcessor(s) belonging 
to a single measure.
 type topNProcessorManager struct {
-       l            *logger.Logger
-       pipeline     queue.Queue
-       m            *measure
-       s            logical.TagSpecRegistry
-       processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
-       topNSchemas  []*databasev1.TopNAggregation
+       l               *logger.Logger
+       pipeline        queue.Queue
+       m               *measure
+       s               logical.TagSpecRegistry
+       registeredTasks []*databasev1.TopNAggregation
+       processorList   []*topNStreamingProcessor
        sync.RWMutex
 }
 
+func (manager *topNProcessorManager) init(m *measure) {
+       manager.Lock()
+       defer manager.Unlock()
+       if manager.m != nil {
+               return
+       }
+       manager.m = m
+       tagMapSpec := logical.TagSpecMap{}
+       tagMapSpec.RegisterTagFamilies(m.schema.GetTagFamilies())
+       for i := range manager.registeredTasks {
+               if err := manager.start(manager.registeredTasks[i]); err != nil 
{
+                       manager.l.Err(err).Msg("fail to start processor")
+               }
+       }
+}
+
 func (manager *topNProcessorManager) Close() error {
        manager.Lock()
        defer manager.Unlock()
        var err error
-       for _, processorList := range manager.processorMap {
-               for _, processor := range processorList {
-                       err = multierr.Append(err, processor.Close())
-               }
+       for _, processor := range manager.processorList {
+               err = multierr.Append(err, processor.Close())
        }
+       manager.processorList = nil
+       manager.registeredTasks = nil
        return err
 }
 
-func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID 
uint32, request *measurev1.InternalWriteRequest) {
+func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID 
uint32, request *measurev1.InternalWriteRequest, measure *measure) {
        go func() {
                manager.RLock()
                defer manager.RUnlock()
-               for _, processorList := range manager.processorMap {
-                       for _, processor := range processorList {
-                               processor.src <- 
flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
-                                       request.GetRequest().GetDataPoint(),
-                                       request.GetEntityValues(),
-                                       seriesID,
-                                       shardID,
-                               }, 
request.GetRequest().GetDataPoint().GetTimestamp())
-                       }
+               if manager.m == nil {
+                       manager.RUnlock()
+                       manager.init(measure)
+                       manager.RLock()
+               }
+               for _, processor := range manager.processorList {
+                       processor.src <- 
flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
+                               request.GetRequest().GetDataPoint(),
+                               request.GetEntityValues(),
+                               seriesID,
+                               shardID,
+                       }, request.GetRequest().GetDataPoint().GetTimestamp())
                }
        }()
 }
 
-func (manager *topNProcessorManager) start() error {
-       interval := manager.m.interval
-       for _, topNSchema := range manager.topNSchemas {
-               sortDirections := make([]modelv1.Sort, 0, 2)
-               if topNSchema.GetFieldValueSort() == 
modelv1.Sort_SORT_UNSPECIFIED {
-                       sortDirections = append(sortDirections, 
modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
-               } else {
-                       sortDirections = append(sortDirections, 
topNSchema.GetFieldValueSort())
-               }
-
-               processorList := make([]*topNStreamingProcessor, 
len(sortDirections))
-               for i, sortDirection := range sortDirections {
-                       srcCh := make(chan interface{})
-                       src, _ := sources.NewChannel(srcCh)
-                       name := 
strings.Join([]string{topNSchema.GetMetadata().Group, 
topNSchema.GetMetadata().Name, modelv1.Sort_name[int32(sortDirection)]}, "-")
-                       streamingFlow := streaming.New(name, src)
-
-                       filters, buildErr := 
manager.buildFilter(topNSchema.GetCriteria())
-                       if buildErr != nil {
-                               return buildErr
-                       }
-                       streamingFlow = streamingFlow.Filter(filters)
-
-                       mapper, innerErr := 
manager.buildMapper(topNSchema.GetFieldName(), 
topNSchema.GetGroupByTagNames()...)
-                       if innerErr != nil {
-                               return innerErr
-                       }
-                       streamingFlow = streamingFlow.Map(mapper)
-                       processor := &topNStreamingProcessor{
-                               m:             manager.m,
-                               l:             manager.l,
-                               interval:      interval,
-                               topNSchema:    topNSchema,
-                               sortDirection: sortDirection,
-                               src:           srcCh,
-                               in:            make(chan flow.StreamRecord),
-                               stopCh:        make(chan struct{}),
-                               streamingFlow: streamingFlow,
-                               pipeline:      manager.pipeline,
-                               buf:           make([]byte, 0, 64),
+func (manager *topNProcessorManager) register(topNSchema 
*databasev1.TopNAggregation) {
+       manager.Lock()
+       defer manager.Unlock()
+       exist := false
+       for i := range manager.registeredTasks {
+               if manager.registeredTasks[i].GetMetadata().GetName() == 
topNSchema.GetMetadata().GetName() {
+                       exist = true
+                       if 
manager.registeredTasks[i].GetMetadata().GetModRevision() < 
topNSchema.GetMetadata().GetModRevision() {
+                               prev := manager.registeredTasks[i]
+                               prevProcessors := manager.removeProcessors(prev)
+                               if err := manager.start(topNSchema); err != nil 
{
+                                       manager.l.Err(err).Msg("fail to start 
the new processor")
+                                       return
+                               }
+                               manager.registeredTasks[i] = topNSchema
+                               for _, processor := range prevProcessors {
+                                       if err := processor.Close(); err != nil 
{
+                                               manager.l.Err(err).Msg("fail to 
close the prev processor")
+                                       }
+                               }
                        }
-                       processorList[i] = processor.start()
                }
+       }
+       if exist {
+               return
+       }
+       manager.registeredTasks = append(manager.registeredTasks, topNSchema)
+       if err := manager.start(topNSchema); err != nil {
+               manager.l.Err(err).Msg("fail to start processor")
+       }
+}
 
-               manager.processorMap[topNSchema.GetSourceMeasure()] = 
processorList
+func (manager *topNProcessorManager) start(topNSchema 
*databasev1.TopNAggregation) error {
+       if manager.m == nil {
+               return nil
        }
+       interval := manager.m.interval
+       sortDirections := make([]modelv1.Sort, 0, 2)
+       if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+               sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, 
modelv1.Sort_SORT_DESC)
+       } else {
+               sortDirections = append(sortDirections, 
topNSchema.GetFieldValueSort())
+       }
+
+       processorList := make([]*topNStreamingProcessor, len(sortDirections))
+       for i, sortDirection := range sortDirections {
+               srcCh := make(chan interface{})
+               src, _ := sources.NewChannel(srcCh)
+               name := strings.Join([]string{topNSchema.GetMetadata().Group, 
topNSchema.GetMetadata().Name, modelv1.Sort_name[int32(sortDirection)]}, "-")
+               streamingFlow := streaming.New(name, src)
+
+               filters, buildErr := 
manager.buildFilter(topNSchema.GetCriteria())
+               if buildErr != nil {
+                       return buildErr
+               }
+               streamingFlow = streamingFlow.Filter(filters)
 
+               mapper, innerErr := 
manager.buildMapper(topNSchema.GetFieldName(), 
topNSchema.GetGroupByTagNames()...)
+               if innerErr != nil {
+                       return innerErr
+               }
+               streamingFlow = streamingFlow.Map(mapper)
+               processor := &topNStreamingProcessor{
+                       m:             manager.m,
+                       l:             manager.l,
+                       interval:      interval,
+                       topNSchema:    topNSchema,
+                       sortDirection: sortDirection,
+                       src:           srcCh,
+                       in:            make(chan flow.StreamRecord),
+                       stopCh:        make(chan struct{}),
+                       streamingFlow: streamingFlow,
+                       pipeline:      manager.pipeline,
+                       buf:           make([]byte, 0, 64),
+               }
+               processorList[i] = processor.start()
+       }
+       manager.processorList = append(manager.processorList, processorList...)
        return nil
 }
 
+func (manager *topNProcessorManager) removeProcessors(topNSchema 
*databasev1.TopNAggregation) []*topNStreamingProcessor {
+       var processors []*topNStreamingProcessor
+       for i := range manager.processorList {
+               if manager.processorList[i].topNSchema.GetMetadata().GetName() 
== topNSchema.GetMetadata().GetName() {
+                       processors = append(processors, 
manager.processorList[i])
+                       manager.processorList = 
append(manager.processorList[:i], manager.processorList[i+1:]...)
+               }
+       }
+       return processors
+}
+
 func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) 
(flow.UnaryFunc[bool], error) {
        // if criteria is nil, we handle all incoming elements
        if criteria == nil {
@@ -387,7 +492,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName 
string, groupByNames
                return spec.GetName() == fieldName
        })
        if fieldIdx == -1 {
-               return nil, errors.New("invalid fieldName")
+               return nil, fmt.Errorf("field %s is not found in %s schema", 
fieldName, manager.m.GetSchema().GetMetadata().GetName())
        }
        if len(groupByNames) == 0 {
                return func(_ context.Context, request any) any {
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index da6f8f9d..ba62bc97 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -165,15 +165,15 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        }
        dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
 
-       if stm.processorManager != nil {
-               stm.processorManager.onMeasureWrite(uint64(series.ID), 
uint32(shardID), &measurev1.InternalWriteRequest{
+       if p, _ := 
w.schemaRepo.topNProcessorMap.Load(getKey(stm.schema.GetMetadata())); p != nil {
+               p.(*topNProcessorManager).onMeasureWrite(uint64(series.ID), 
uint32(shardID), &measurev1.InternalWriteRequest{
                        Request: &measurev1.WriteRequest{
                                Metadata:  stm.GetSchema().Metadata,
                                DataPoint: req.DataPoint,
                                MessageId: uint64(time.Now().UnixNano()),
                        },
                        EntityValues: writeEvent.EntityValues,
-               })
+               }, stm)
        }
 
        doc := index.Document{
@@ -223,10 +223,12 @@ func (w *writeCallback) newDpt(tsdb 
storage.TSDB[*tsTable, option], dpg *dataPoi
 
 func (w *writeCallback) handleTagFamily(stm *measure, req 
*measurev1.WriteRequest) ([]nameValues, []index.Field) {
        tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies))
-       if len(stm.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
+       is := stm.indexSchema.Load().(indexSchema)
+       if len(is.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
                logger.Panicf("metadata crashed, tag family rule length %d, tag 
family length %d",
-                       len(stm.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
+                       len(is.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
+
        var fields []index.Field
        for i := range stm.GetSchema().GetTagFamilies() {
                var tagFamily *modelv1.TagFamilyForWrite
@@ -235,7 +237,7 @@ func (w *writeCallback) handleTagFamily(stm *measure, req 
*measurev1.WriteReques
                } else {
                        tagFamily = req.DataPoint.TagFamilies[i]
                }
-               tfr := stm.indexRuleLocators.TagFamilyTRule[i]
+               tfr := is.indexRuleLocators.TagFamilyTRule[i]
                tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
                tf := nameValues{
                        name: tagFamilySpec.Name,
@@ -283,7 +285,7 @@ func (w *writeCallback) handleTagFamily(stm *measure, req 
*measurev1.WriteReques
                                }
                                continue
                        }
-                       _, isEntity := stm.indexRuleLocators.EntitySet[t.Name]
+                       _, isEntity := is.indexRuleLocators.EntitySet[t.Name]
                        if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
                                continue
                        }
@@ -301,8 +303,9 @@ func (w *writeCallback) 
appendEntityTagsToIndexFields(fields []index.Field, stm
        f.Index = true
        f.NoSort = true
        fields = append(fields, f)
+       is := stm.indexSchema.Load().(indexSchema)
        for i := range stm.schema.Entity.TagNames {
-               if _, exists := stm.indexTagMap[stm.schema.Entity.TagNames[i]]; 
exists {
+               if _, exists := is.indexTagMap[stm.schema.Entity.TagNames[i]]; 
exists {
                        continue
                }
                tagName := stm.schema.Entity.TagNames[i]
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index ba0f57a0..b70336f8 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -62,17 +62,18 @@ func NewClient(forceRegisterNode bool) (Service, error) {
 }
 
 type clientService struct {
-       schemaRegistry    schema.Registry
-       closer            *run.Closer
-       namespace         string
-       etcdUsername      string
-       etcdPassword      string
-       etcdTLSCAFile     string
-       etcdTLSCertFile   string
-       etcdTLSKeyFile    string
-       endpoints         []string
-       registryTimeout   time.Duration
-       forceRegisterNode bool
+       schemaRegistry       schema.Registry
+       closer               *run.Closer
+       namespace            string
+       etcdUsername         string
+       etcdPassword         string
+       etcdTLSCAFile        string
+       etcdTLSCertFile      string
+       etcdTLSKeyFile       string
+       endpoints            []string
+       registryTimeout      time.Duration
+       etcdFullSyncInterval time.Duration
+       forceRegisterNode    bool
 }
 
 func (s *clientService) SchemaRegistry() schema.Registry {
@@ -89,6 +90,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
        fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client 
certificate")
        fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key 
for the etcd client certificate.")
        fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
+       fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 
30*time.Minute, "The interval for full sync etcd")
        return fs
 }
 
@@ -123,6 +125,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
                        schema.ConfigureEtcdUser(s.etcdUsername, 
s.etcdPassword),
                        schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
                        schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, 
s.etcdTLSKeyFile),
+                       
schema.ConfigureWatchCheckInterval(s.etcdFullSyncInterval),
                )
                if errors.Is(err, context.DeadlineExceeded) {
                        select {
@@ -193,6 +196,9 @@ func (s *clientService) PreRun(ctx context.Context) error {
 }
 
 func (s *clientService) Serve() run.StopNotify {
+       if s.schemaRegistry != nil {
+               s.schemaRegistry.StartWatcher()
+       }
        return s.closer.CloseNotify()
 }
 
diff --git a/banyand/metadata/embeddedserver/server.go 
b/banyand/metadata/embeddedserver/server.go
index 44bd4864..908cbe7b 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -81,6 +81,7 @@ func (s *server) PreRun(ctx context.Context) error {
 }
 
 func (s *server) Serve() run.StopNotify {
+       _ = s.Service.Serve()
        return s.metaServer.StoppingNotify()
 }
 
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 4f2d9ed4..4e3d51e9 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -40,6 +40,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+const (
+       minCheckInterval     = time.Second * 5
+       defaultCheckInterval = time.Minute * 10
+)
+
 var (
        _ Stream           = (*etcdSchemaRegistry)(nil)
        _ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
@@ -101,13 +106,32 @@ func ConfigureEtcdTLSCertAndKey(certFile string, keyFile 
string) RegistryOption
        }
 }
 
+// ConfigureWatchCheckInterval sets the interval to check the watcher.
+func ConfigureWatchCheckInterval(d time.Duration) RegistryOption {
+       return func(config *etcdSchemaRegistryConfig) {
+               if d >= minCheckInterval {
+                       config.checkInterval = d
+               }
+       }
+}
+
+// CheckInterval sets the interval to check the watcher.
+func CheckInterval(d time.Duration) WatcherOption {
+       return func(wc *watcherConfig) {
+               if d >= minCheckInterval {
+                       wc.checkInterval = d
+               }
+       }
+}
+
 type etcdSchemaRegistry struct {
-       namespace string
-       client    *clientv3.Client
-       closer    *run.Closer
-       l         *logger.Logger
-       watchers  []*watcher
-       mux       sync.RWMutex
+       client        *clientv3.Client
+       closer        *run.Closer
+       l             *logger.Logger
+       watchers      map[Kind]*watcher
+       namespace     string
+       checkInterval time.Duration
+       mux           sync.RWMutex
 }
 
 type etcdSchemaRegistryConfig struct {
@@ -118,6 +142,7 @@ type etcdSchemaRegistryConfig struct {
        tlsCertFile     string
        tlsKeyFile      string
        serverEndpoints []string
+       checkInterval   time.Duration
 }
 
 func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler 
EventHandler) {
@@ -142,20 +167,42 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, 
kind Kind, handler Eve
                        return
                }
                for i := range kinds {
-                       e.l.Info().Str("name", name).Stringer("kind", 
kinds[i]).Msg("registering watcher")
-                       w := e.newWatcherWithRevision(name, kinds[i], 
revisions[i], handler)
-                       if w != nil {
-                               e.watchers = append(e.watchers, w)
-                       }
+                       e.registerToWatcher(name, kinds[i], revisions[i], 
handler)
                }
                return
        }
        for i := range kinds {
-               e.l.Info().Str("name", name).Stringer("kind", 
kinds[i]).Msg("registering watcher")
-               w := e.NewWatcher(name, kinds[i], handler)
-               if w != nil {
-                       e.watchers = append(e.watchers, w)
+               e.registerToWatcher(name, kinds[i], 0, handler)
+       }
+}
+
+func (e *etcdSchemaRegistry) registerToWatcher(name string, kind Kind, 
revision int64, handler EventHandler) {
+       if w, ok := e.watchers[kind]; ok {
+               e.l.Info().Str("name", name).Stringer("kind", 
kind).Msg("registering to an existing watcher")
+               w.AddHandler(handler)
+               if w.revision > revision {
+                       w.revision = revision
                }
+               return
+       }
+       e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to 
a new watcher")
+       w := e.newWatcherWithRevision(name, kind, revision, 
CheckInterval(e.checkInterval))
+       w.AddHandler(handler)
+       e.watchers[kind] = w
+}
+
+func (e *etcdSchemaRegistry) Compact(ctx context.Context, revision int64) 
error {
+       if !e.closer.AddRunning() {
+               return ErrClosed
+       }
+       defer e.closer.Done()
+       _, err := e.client.Compact(ctx, revision)
+       return err
+}
+
+func (e *etcdSchemaRegistry) StartWatcher() {
+       for _, w := range e.watchers {
+               w.Start()
        }
 }
 
@@ -202,10 +249,12 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
                return nil, err
        }
        reg := &etcdSchemaRegistry{
-               namespace: registryConfig.namespace,
-               client:    client,
-               closer:    run.NewCloser(1),
-               l:         logger.GetLogger("schema-registry"),
+               namespace:     registryConfig.namespace,
+               client:        client,
+               closer:        run.NewCloser(1),
+               l:             logger.GetLogger("schema-registry"),
+               checkInterval: registryConfig.checkInterval,
+               watchers:      make(map[Kind]*watcher),
        }
        return reg, nil
 }
@@ -525,17 +574,21 @@ func (e *etcdSchemaRegistry) revokeLease(lease 
*clientv3.LeaseGrantResponse) {
        }
 }
 
-func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, handler 
watchEventHandler) *watcher {
-       return e.newWatcherWithRevision(name, kind, 0, handler)
+func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, opts 
...WatcherOption) *watcher {
+       return e.newWatcherWithRevision(name, kind, 0, opts...)
 }
 
-func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, 
revision int64, handler watchEventHandler) *watcher {
-       return newWatcher(e.client, watcherConfig{
-               key:      e.prependNamespace(kind.key()),
-               kind:     kind,
-               handler:  handler,
-               revision: revision,
-       }, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String())))
+func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, 
revision int64, opts ...WatcherOption) *watcher {
+       wc := watcherConfig{
+               key:           e.prependNamespace(kind.key()),
+               kind:          kind,
+               revision:      revision,
+               checkInterval: 5 * time.Minute, // Default value
+       }
+       for _, opt := range opts {
+               opt(&wc)
+       }
+       return newWatcher(e.client, wc, e.l.Named(fmt.Sprintf("watcher-%s[%s]", 
name, kind.String())))
 }
 
 func listPrefixesForEntity(group, entityPrefix string) string {
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index 94d9c324..8f4a8e35 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -51,6 +51,9 @@ type ListOpt struct {
        Group string
 }
 
+// WatcherOption is a placeholder for watcher configuration.
+type WatcherOption func(*watcherConfig)
+
 // Registry allowing depositing resources.
 type Registry interface {
        io.Closer
@@ -62,8 +65,10 @@ type Registry interface {
        TopNAggregation
        Node
        RegisterHandler(string, Kind, EventHandler)
-       NewWatcher(string, Kind, watchEventHandler) *watcher
+       NewWatcher(string, Kind, ...WatcherOption) *watcher
        Register(context.Context, Metadata, bool) error
+       Compact(context.Context, int64) error
+       StartWatcher()
 }
 
 // TypeMeta defines the identity and type of an Event.
diff --git a/banyand/metadata/schema/watcher.go 
b/banyand/metadata/schema/watcher.go
index 2464efe4..3c635237 100644
--- a/banyand/metadata/schema/watcher.go
+++ b/banyand/metadata/schema/watcher.go
@@ -18,13 +18,16 @@
 package schema
 
 import (
-       "errors"
+       "context"
+       "sync"
        "time"
 
+       "github.com/pkg/errors"
        mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
        v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
        clientv3 "go.etcd.io/etcd/client/v3"
 
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -34,69 +37,68 @@ type watchEventHandler interface {
        OnDelete(Metadata)
 }
 type watcherConfig struct {
-       handler  watchEventHandler
-       key      string
-       revision int64
-       kind     Kind
+       key           string
+       revision      int64
+       kind          Kind
+       checkInterval time.Duration
+}
+
+type cacheEntry struct {
+       valueHash   uint64 // Hash of the value
+       modRevision int64  // Last modified revision
 }
 
 type watcher struct {
-       handler  watchEventHandler
-       cli      *clientv3.Client
-       closer   *run.Closer
-       l        *logger.Logger
-       key      string
-       revision int64
-       kind     Kind
+       cli           *clientv3.Client
+       closer        *run.Closer
+       l             *logger.Logger
+       ticker        *time.Ticker
+       cache         map[string]cacheEntry
+       key           string
+       handlers      []watchEventHandler
+       revision      int64
+       kind          Kind
+       checkInterval time.Duration
+       mu            sync.RWMutex
+       startOnce     sync.Once
 }
 
 func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger) 
*watcher {
-       w := &watcher{
-               cli:      cli,
-               key:      wc.key,
-               kind:     wc.kind,
-               handler:  wc.handler,
-               revision: wc.revision,
-               closer:   run.NewCloser(1),
-               l:        l,
+       if wc.checkInterval == 0 {
+               wc.checkInterval = 5 * time.Minute
        }
-       revision := w.revision
-       if revision < 1 {
-               revision = w.allEvents()
+       w := &watcher{
+               cli:           cli,
+               key:           wc.key,
+               kind:          wc.kind,
+               revision:      wc.revision,
+               closer:        run.NewCloser(1),
+               l:             l,
+               checkInterval: wc.checkInterval,
+               cache:         make(map[string]cacheEntry),
        }
-       go w.watch(revision)
        return w
 }
 
-func (w *watcher) Close() {
-       w.closer.Done()
-       w.closer.CloseThenWait()
+func (w *watcher) Start() {
+       w.startOnce.Do(func() {
+               if w.revision < 1 {
+                       w.periodicSync()
+               }
+               go w.watch(w.revision)
+       })
 }
 
-func (w *watcher) allEvents() int64 {
-       cli := w.cli
-       var resp *clientv3.GetResponse
-       start := time.Now()
-       var eventHandleTime time.Duration
-       var eventSize int
-       for {
-               var err error
-               if resp, err = cli.Get(w.closer.Ctx(), w.key, 
clientv3.WithPrefix()); err == nil {
-                       startHandle := time.Now()
-                       eventSize = len(resp.Kvs)
-                       w.handleAllEvents(resp.Kvs)
-                       eventHandleTime = time.Since(startHandle)
-                       break
-               }
-               select {
-               case <-w.closer.CloseNotify():
-                       return -1
-               case <-time.After(1 * time.Second):
-               }
+func (w *watcher) AddHandler(handler watchEventHandler) {
+       if w.handlers == nil {
+               w.handlers = make([]watchEventHandler, 0)
        }
-       w.l.Info().Dur("event_handle_time", eventHandleTime).Dur("total_time", 
time.Since(start)).
-               Int("event_size", eventSize).Str("key", w.key).Msg("watcher all 
events")
-       return resp.Header.Revision
+       w.handlers = append(w.handlers, handler)
+}
+
+func (w *watcher) Close() {
+       w.closer.Done()
+       w.closer.CloseThenWait()
 }
 
 func (w *watcher) watch(revision int64) {
@@ -105,17 +107,25 @@ func (w *watcher) watch(revision int64) {
        }
        defer w.closer.Done()
        cli := w.cli
+
+       w.ticker = time.NewTicker(w.checkInterval)
+       defer w.ticker.Stop()
+
 OUTER:
        for {
-               if revision < 0 {
-                       revision = w.allEvents()
-               }
                select {
                case <-w.closer.CloseNotify():
                        return
                default:
                }
 
+               if revision < 0 {
+                       // Use periodic sync to recover state and get new 
revision
+                       w.periodicSync()
+                       revision = w.revision
+                       continue
+               }
+
                wch := cli.Watch(w.closer.Ctx(), w.key,
                        clientv3.WithPrefix(),
                        clientv3.WithRev(revision+1),
@@ -124,39 +134,34 @@ OUTER:
                if wch == nil {
                        continue
                }
+
                for {
                        select {
                        case <-w.closer.CloseNotify():
-                               w.l.Info().Msgf("watcher closed")
+                               w.l.Info().Msg("watcher closed")
                                return
+                       case <-w.ticker.C:
+                               w.periodicSync()
                        case watchResp, ok := <-wch:
                                if !ok {
                                        select {
                                        case <-w.closer.CloseNotify():
                                                return
                                        default:
-                                               break OUTER
+                                               revision = -1
+                                               continue OUTER
                                        }
                                }
                                if err := watchResp.Err(); err != nil {
-                                       select {
-                                       case <-w.closer.CloseNotify():
-                                               return
-                                       default:
-                                               if errors.Is(err, 
v3rpc.ErrCompacted) {
-                                                       revision = -1
-                                                       break
-                                               }
-                                               continue
+                                       if errors.Is(err, v3rpc.ErrCompacted) {
+                                               revision = -1
+                                               continue OUTER
                                        }
+                                       continue
                                }
+                               w.revision = watchResp.Header.Revision
                                for _, event := range watchResp.Events {
-                                       select {
-                                       case <-w.closer.CloseNotify():
-                                               return
-                                       default:
-                                               w.handle(event, &watchResp)
-                                       }
+                                       w.handle(event, &watchResp)
                                }
                        }
                }
@@ -164,31 +169,119 @@ OUTER:
 }
 
 func (w *watcher) handle(watchEvent *clientv3.Event, watchResp 
*clientv3.WatchResponse) {
+       keyStr := string(watchEvent.Kv.Key)
+       entry := cacheEntry{
+               valueHash:   convert.Hash(watchEvent.Kv.Value),
+               modRevision: watchEvent.Kv.ModRevision,
+       }
+
+       handlers := w.handlers
+       if len(handlers) == 0 {
+               w.l.Panic().Msg("no handlers registered")
+               return
+       }
+
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
        switch watchEvent.Type {
        case mvccpb.PUT:
-               md, err := w.kind.Unmarshal(watchEvent.Kv)
-               if err != nil {
-                       w.l.Error().Stringer("event_header", 
&watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message")
-                       return
+               if existing, exists := w.cache[keyStr]; !exists || 
existing.modRevision < entry.modRevision {
+                       w.cache[keyStr] = entry
+
+                       md, err := w.kind.Unmarshal(watchEvent.Kv)
+                       if err != nil {
+                               w.l.Error().Stringer("event_header", 
&watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message")
+                               return
+                       }
+                       for i := range handlers {
+                               handlers[i].OnAddOrUpdate(md)
+                       }
                }
-               w.handler.OnAddOrUpdate(md)
        case mvccpb.DELETE:
+               delete(w.cache, keyStr)
                md, err := w.kind.Unmarshal(watchEvent.PrevKv)
                if err != nil {
                        w.l.Error().Stringer("event_header", 
&watchResp.Header).AnErr("err", err).Msg("failed to unmarshal message")
                        return
                }
-               w.handler.OnDelete(md)
+               for i := range handlers {
+                       handlers[i].OnDelete(md)
+               }
        }
 }
 
-func (w *watcher) handleAllEvents(kvs []*mvccpb.KeyValue) {
-       for i := 0; i < len(kvs); i++ {
-               md, err := w.kind.Unmarshal(kvs[i])
-               if err != nil {
-                       w.l.Error().AnErr("err", err).Msg("failed to unmarshal 
message")
+func (w *watcher) periodicSync() {
+       resp, err := w.cli.Get(w.closer.Ctx(), w.key, clientv3.WithPrefix())
+       if err != nil {
+               if !errors.Is(err, context.Canceled) {
+                       w.l.Error().Err(err).Msg("periodic sync failed to fetch 
keys")
+               }
+               return
+       }
+
+       currentState := make(map[string]cacheEntry, len(resp.Kvs))
+       for _, kv := range resp.Kvs {
+               currentState[string(kv.Key)] = cacheEntry{
+                       valueHash:   convert.Hash(kv.Value),
+                       modRevision: kv.ModRevision,
+               }
+       }
+
+       handlers := w.handlers
+       if len(handlers) == 0 {
+               w.l.Panic().Msg("no handlers registered")
+               return
+       }
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       // Detect deletions and changes
+       for cachedKey, cachedEntry := range w.cache {
+               currentEntry, exists := currentState[cachedKey]
+               if !exists {
+                       // Handle deletion
+                       delete(w.cache, cachedKey)
+                       if md, err := w.getFromStore(cachedKey); err == nil {
+                               for i := range handlers {
+                                       handlers[i].OnDelete(*md)
+                               }
+                       }
                        continue
                }
-               w.handler.OnAddOrUpdate(md)
+
+               if currentEntry.valueHash != cachedEntry.valueHash {
+                       // Handle update
+                       if md, err := w.getFromStore(cachedKey); err == nil {
+                               for i := range handlers {
+                                       handlers[i].OnAddOrUpdate(*md)
+                               }
+                               w.cache[cachedKey] = currentEntry
+                       }
+               }
+       }
+
+       // Detect additions
+       for key, entry := range currentState {
+               if _, exists := w.cache[key]; !exists {
+                       if md, err := w.getFromStore(key); err == nil {
+                               for i := range handlers {
+                                       handlers[i].OnAddOrUpdate(*md)
+                               }
+                               w.cache[key] = entry
+                       }
+               }
+       }
+}
+
+func (w *watcher) getFromStore(key string) (*Metadata, error) {
+       resp, err := w.cli.Get(w.closer.Ctx(), key)
+       if err != nil {
+               return nil, err
+       }
+       if len(resp.Kvs) == 0 {
+               return nil, errors.New("key not found")
        }
+       md, err := w.kind.Unmarshal(resp.Kvs[0])
+       return &md, err
 }
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index 65c45b40..c2fa90dd 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "sync"
        "sync/atomic"
+       "time"
 
        ginkgo "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
@@ -171,7 +172,9 @@ var _ = ginkgo.Describe("Watcher", func() {
                }
 
                // Start the watcher
-               watcher := registry.NewWatcher("test", schema.KindMeasure, 
mockedObj)
+               watcher := registry.NewWatcher("test", schema.KindMeasure)
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
                ginkgo.DeferCleanup(func() {
                        watcher.Close()
                })
@@ -188,7 +191,9 @@ var _ = ginkgo.Describe("Watcher", func() {
                }, flags.EventuallyTimeout).Should(gomega.BeTrue())
        })
        ginkgo.It("should handle watch events", func() {
-               watcher := registry.NewWatcher("test", schema.KindStream, 
mockedObj)
+               watcher := registry.NewWatcher("test", schema.KindStream)
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
                ginkgo.DeferCleanup(func() {
                        watcher.Close()
                })
@@ -246,4 +251,242 @@ var _ = ginkgo.Describe("Watcher", func() {
                                len(mockedObj.Data()) == 0
                }, flags.EventuallyTimeout).Should(gomega.BeTrue())
        })
+       ginkgo.It("should load initial state and track revisions", func() {
+               groupName := "testgroup-initial"
+               err := registry.CreateGroup(context.Background(), 
&commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 1,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Num:  1,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Num:  3,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               _, err = registry.CreateMeasure(context.Background(), 
&databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  "initial-key1",
+                               Group: groupName,
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"testtag"},
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "testtagfamily",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "testtag",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               _, err = registry.CreateMeasure(context.Background(), 
&databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  "initial-key2",
+                               Group: groupName,
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"testtag"},
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "testtagfamily",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "testtag",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Second))
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
+               ginkgo.DeferCleanup(func() {
+                       watcher.Close()
+               })
+
+               gomega.Eventually(func() int {
+                       return len(mockedObj.Data())
+               }, flags.EventuallyTimeout).Should(gomega.Equal(2))
+
+               
gomega.Expect(mockedObj.addOrUpdateCalledNum.Load()).To(gomega.Equal(int32(2)))
+       })
+
+       ginkgo.It("should detect deletions", func() {
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Second))
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
+               ginkgo.DeferCleanup(func() {
+                       watcher.Close()
+               })
+
+               groupName := "testgroup-delete"
+               measureName := "delete-key"
+               err := registry.CreateGroup(context.Background(), 
&commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 1,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Num:  1,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Num:  3,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               _, err = registry.CreateMeasure(context.Background(), 
&databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  measureName,
+                               Group: groupName,
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"testtag"},
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "testtagfamily",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "testtag",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               gomega.Eventually(func() bool {
+                       _, ok := mockedObj.Data()[measureName]
+                       return ok
+               }, flags.EventuallyTimeout).Should(gomega.BeTrue())
+
+               deleted, err := registry.DeleteMeasure(context.Background(), 
&commonv1.Metadata{
+                       Name:  measureName,
+                       Group: groupName,
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(deleted).To(gomega.BeTrue())
+
+               gomega.Eventually(func() int {
+                       return int(mockedObj.deleteCalledNum.Load())
+               }, 5*time.Second).Should(gomega.Equal(1))
+               
gomega.Expect(mockedObj.Data()).NotTo(gomega.HaveKey(measureName))
+       })
+
+       ginkgo.It("should recover state after compaction", func() {
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Hour))
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
+               ginkgo.DeferCleanup(func() {
+                       watcher.Close()
+               })
+
+               groupName := "testgroup-compact"
+               measureName := "compact-key"
+               err := registry.CreateGroup(context.Background(), 
&commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: groupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 1,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Num:  1,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Num:  3,
+                                       Unit: commonv1.IntervalRule_UNIT_DAY,
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               modRev, err := registry.CreateMeasure(context.Background(), 
&databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  measureName,
+                               Group: groupName,
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"testtag"},
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "testtagfamily",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "testtag",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               gomega.Eventually(func() bool {
+                       _, ok := mockedObj.Data()[measureName]
+                       return ok
+               }, flags.EventuallyTimeout).Should(gomega.BeTrue())
+
+               err = registry.Compact(context.Background(), modRev)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               updatedMeasure := &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{
+                               Name:  measureName,
+                               Group: groupName,
+                       },
+                       Entity: &databasev1.Entity{
+                               TagNames: []string{"testtag"},
+                       },
+                       TagFamilies: []*databasev1.TagFamilySpec{
+                               {
+                                       Name: "testtagfamily",
+                                       Tags: []*databasev1.TagSpec{
+                                               {
+                                                       Name: "testtag",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                               {
+                                                       Name: "testtag1",
+                                                       Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                               },
+                                       },
+                               },
+                       },
+               }
+               _, err = registry.UpdateMeasure(context.Background(), 
updatedMeasure)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               gomega.Eventually(func() int {
+                       return int(mockedObj.addOrUpdateCalledNum.Load())
+               }, 5*time.Second).Should(gomega.BeNumerically(">=", 2))
+       })
 })
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 0bb22e7b..d765391a 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -40,7 +40,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
-       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 type topNQueryProcessor struct {
@@ -105,7 +104,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("topn plan")
        }
-       topNResultMeasure, err := 
t.measureService.Measure(pkgschema.GetTopNSchemaMetadata(topNMetadata.Group))
+       topNResultMeasure, err := 
t.measureService.Measure(measure.GetTopNSchemaMetadata(topNMetadata.Group))
        if err != nil {
                ml.Error().Err(err).Str("topN", 
topNMetadata.GetName()).Msg("fail to find topn result measure")
                return
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index 866bfe88..af3644d8 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -256,10 +256,11 @@ func generateStream(db storage.TSDB[*tsTable, option]) 
*stream {
                Entity:      entity,
                TagFamilies: []*databasev1.TagFamilySpec{tagFamily},
        }
-       return &stream{
-               databaseSupplier: dbSupplier,
-               schema:           schema,
+       s := &stream{
+               schema: schema,
        }
+       s.tsdb.Store(db)
+       return s
 }
 
 func generateStreamQueryOptions(p parameter, midx mockIndex) 
model.StreamQueryOptions {
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 00c4b337..e4a27945 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -510,9 +510,9 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        tmpBlock.reset()
        bc.bm.tagProjection = bc.tagProjection
        var tf map[string]*dataBlock
-       for i := range bc.tagProjection {
+       for _, tp := range bc.tagProjection {
                for tfName, block := range bc.bm.tagFamilies {
-                       if bc.tagProjection[i].Family == tfName {
+                       if tp.Family == tfName {
                                if tf == nil {
                                        tf = make(map[string]*dataBlock, 
len(bc.tagProjection))
                                }
@@ -520,8 +520,15 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
                        }
                }
        }
+       if len(tf) == 0 {
+               return false
+       }
+
        bc.bm.tagFamilies = tf
        tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)
+       if len(tmpBlock.timestamps) == 0 {
+               return false
+       }
 
        idxList := make([]int, 0)
        var start, end int
diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go
index e8a54f03..7e3d89df 100644
--- a/banyand/stream/block_scanner.go
+++ b/banyand/stream/block_scanner.go
@@ -46,6 +46,7 @@ type blockScanResult struct {
 
 func (bs *blockScanResult) reset() {
        bs.p = nil
+       bs.qo.reset()
        bs.bm.reset()
 }
 
@@ -55,6 +56,7 @@ type blockScanResultBatch struct {
 }
 
 func (bsb *blockScanResultBatch) reset() {
+       bsb.err = nil
        for i := range bsb.bss {
                bsb.bss[i].reset()
        }
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 07fdd873..8d1bea4a 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -107,7 +107,8 @@ func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, 
[]int64) {
                logger.Panicf("invalid kinds: %v", kinds)
                return false, nil
        }
-       return true, sr.Repository.Init(schema.KindStream)
+       _, revs := sr.Repository.Init(schema.KindStream)
+       return true, revs
 }
 
 func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
@@ -137,47 +138,29 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata 
schema.Metadata) {
                        Metadata: metadata.Spec.(*databasev1.Stream),
                })
        case schema.KindIndexRuleBinding:
-               irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding)
-               if !ok {
-                       sr.l.Warn().Msg("fail to convert message to 
IndexRuleBinding")
-                       return
-               }
-               if err := validate.IndexRuleBinding(irb); err != nil {
-                       sr.l.Warn().Err(err).Msg("index rule binding is 
ignored")
-                       return
-               }
-               if irb.GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       stm, err := sr.metadata.StreamRegistry().GetStream(ctx, 
&commonv1.Metadata{
-                               Name:  irb.GetSubject().GetName(),
-                               Group: metadata.Group,
-                       })
-                       cancel()
-                       if err != nil {
+               if irb, ok := metadata.Spec.(*databasev1.IndexRuleBinding); ok {
+                       if err := validate.IndexRuleBinding(irb); err != nil {
+                               sr.l.Warn().Err(err).Msg("index rule binding is 
ignored")
                                return
                        }
-                       sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                               Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: stm,
-                       })
+                       if irb.GetSubject().Catalog == 
commonv1.Catalog_CATALOG_STREAM {
+                               
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                                       Typ:      
resourceSchema.EventAddOrUpdate,
+                                       Kind:     
resourceSchema.EventKindIndexRuleBinding,
+                                       Metadata: irb,
+                               })
+                       }
                }
        case schema.KindIndexRule:
-               if err := 
validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
-                       sr.l.Warn().Err(err).Msg("index rule is ignored")
-                       return
-               }
-               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
-               defer cancel()
-               subjects, err := sr.metadata.Subjects(ctx, 
metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
-               if err != nil {
-                       return
-               }
-               for _, sub := range subjects {
+               if ir, ok := metadata.Spec.(*databasev1.IndexRule); ok {
+                       if err := 
validate.IndexRule(metadata.Spec.(*databasev1.IndexRule)); err != nil {
+                               sr.l.Warn().Err(err).Msg("index rule is 
ignored")
+                               return
+                       }
                        sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                                Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: sub.(*databasev1.Stream),
+                               Kind:     resourceSchema.EventKindIndexRule,
+                               Metadata: ir,
                        })
                }
        default:
@@ -203,24 +186,23 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                        Metadata: metadata.Spec.(*databasev1.Stream),
                })
        case schema.KindIndexRuleBinding:
-               if 
metadata.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == 
commonv1.Catalog_CATALOG_STREAM {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       defer cancel()
-                       m, err := sr.metadata.StreamRegistry().GetStream(ctx, 
&commonv1.Metadata{
-                               Name:  metadata.Name,
-                               Group: metadata.Group,
-                       })
-                       if err != nil {
-                               return
+               if binding, ok := metadata.Spec.(*databasev1.IndexRuleBinding); 
ok {
+                       if binding.GetSubject().Catalog == 
commonv1.Catalog_CATALOG_MEASURE {
+                               
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
+                                       Typ:      resourceSchema.EventDelete,
+                                       Kind:     
resourceSchema.EventKindIndexRuleBinding,
+                                       Metadata: 
metadata.Spec.(*databasev1.IndexRuleBinding),
+                               })
                        }
-                       // we should update instead of delete
+               }
+       case schema.KindIndexRule:
+               if rule, ok := metadata.Spec.(*databasev1.IndexRule); ok {
                        sr.SendMetadataEvent(resourceSchema.MetadataEvent{
-                               Typ:      resourceSchema.EventAddOrUpdate,
-                               Kind:     resourceSchema.EventKindResource,
-                               Metadata: m,
+                               Typ:      resourceSchema.EventDelete,
+                               Kind:     resourceSchema.EventKindIndexRule,
+                               Metadata: rule,
                        })
                }
-       case schema.KindIndexRule:
        default:
        }
 }
@@ -249,33 +231,34 @@ func (sr *schemaRepo) loadTSDB(groupName string) 
(storage.TSDB[*tsTable, option]
 var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
-       metadata metadata.Repo
-       pipeline queue.Queue
-       omr      observability.MetricsRegistry
-       l        *logger.Logger
-       pm       *protector.Memory
-       path     string
-       option   option
+       metadata   metadata.Repo
+       pipeline   queue.Queue
+       omr        observability.MetricsRegistry
+       l          *logger.Logger
+       pm         *protector.Memory
+       schemaRepo *schemaRepo
+       path       string
+       option     option
 }
 
 func newSupplier(path string, svc *service) *supplier {
        return &supplier{
-               path:     path,
-               metadata: svc.metadata,
-               l:        svc.l,
-               pipeline: svc.localPipeline,
-               option:   svc.option,
-               omr:      svc.omr,
-               pm:       svc.pm,
+               path:       path,
+               metadata:   svc.metadata,
+               l:          svc.l,
+               pipeline:   svc.localPipeline,
+               option:     svc.option,
+               omr:        svc.omr,
+               pm:         svc.pm,
+               schemaRepo: &svc.schemaRepo,
        }
 }
 
-func (s *supplier) OpenResource(shardNum uint32, supplier 
resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) {
+func (s *supplier) OpenResource(spec resourceSchema.Resource) 
(resourceSchema.IndexListener, error) {
        streamSchema := spec.Schema().(*databasev1.Stream)
-       return openStream(shardNum, supplier, streamSpec{
-               schema:     streamSchema,
-               indexRules: spec.IndexRules(),
-       }, s.l, s.pm), nil
+       return openStream(streamSpec{
+               schema: streamSchema,
+       }, s.l, s.pm, s.schemaRepo), nil
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
@@ -332,10 +315,9 @@ func (*portableSupplier) OpenDB(_ *commonv1.Group) 
(io.Closer, error) {
        panic("do not support open db")
 }
 
-func (s *portableSupplier) OpenResource(shardNum uint32, _ 
resourceSchema.Supplier, spec resourceSchema.Resource) (io.Closer, error) {
+func (s *portableSupplier) OpenResource(spec resourceSchema.Resource) 
(resourceSchema.IndexListener, error) {
        streamSchema := spec.Schema().(*databasev1.Stream)
-       return openStream(shardNum, nil, streamSpec{
-               schema:     streamSchema,
-               indexRules: spec.IndexRules(),
-       }, s.l, nil), nil
+       return openStream(streamSpec{
+               schema: streamSchema,
+       }, s.l, nil, nil), nil
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 24658da3..4ed25ab4 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -40,8 +40,6 @@ import (
 
 const checkDoneEvery = 128
 
-var nilResult = model.StreamQueryResult(nil)
-
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
@@ -49,11 +47,17 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        if len(sqo.TagProjection) == 0 {
                return nil, errors.New("invalid query options: tagProjection is 
required")
        }
-       db := s.databaseSupplier.SupplyTSDB()
+       var tsdb storage.TSDB[*tsTable, option]
+       db := s.tsdb.Load()
        if db == nil {
-               return nilResult, nil
+               tsdb, err = s.schemaRepo.loadTSDB(s.group)
+               if err != nil {
+                       return nil, err
+               }
+               s.tsdb.Store(tsdb)
+       } else {
+               tsdb = db.(storage.TSDB[*tsTable, option])
        }
-       tsdb := db.(storage.TSDB[*tsTable, option])
        segments := tsdb.SelectSegments(*sqo.TimeRange)
        if len(segments) < 1 {
                return bypassQueryResultInstance, nil
@@ -144,8 +148,17 @@ type queryOptions struct {
        maxTimestamp int64
 }
 
+func (qo *queryOptions) reset() {
+       qo.StreamQueryOptions.Reset()
+       qo.elementFilter = nil
+       qo.seriesToEntity = nil
+       qo.sortedSids = nil
+       qo.minTimestamp = 0
+       qo.maxTimestamp = 0
+}
+
 func (qo *queryOptions) copyFrom(other *queryOptions) {
-       qo.StreamQueryOptions = other.StreamQueryOptions
+       qo.StreamQueryOptions.CopyFrom(&other.StreamQueryOptions)
        qo.elementFilter = other.elementFilter
        qo.seriesToEntity = other.seriesToEntity
        qo.sortedSids = other.sortedSids
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index e49e0429..80094ba7 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -131,6 +131,7 @@ func (qr *idxResult) load(ctx context.Context, qo 
queryOptions) *model.StreamRes
                go func(i int) {
                        select {
                        case <-ctx.Done():
+                               releaseBlockCursor(qr.data[i])
                                cursorChan <- i
                                return
                        default:
@@ -167,7 +168,6 @@ func (qr *idxResult) load(ctx context.Context, qo 
queryOptions) *model.StreamRes
                return blankCursorList[i] > blankCursorList[j]
        })
        for _, index := range blankCursorList {
-               releaseBlockCursor(qr.data[index])
                qr.data = append(qr.data[:index], qr.data[index+1:]...)
        }
        qr.loaded = true
diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go
index 5f07f2ea..d45a46ef 100644
--- a/banyand/stream/query_by_ts.go
+++ b/banyand/stream/query_by_ts.go
@@ -48,7 +48,7 @@ type tsResult struct {
 
 func (t *tsResult) Pull(ctx context.Context) *model.StreamResult {
        if len(t.segments) == 0 {
-               return &model.StreamResult{}
+               return nil
        }
        if err := t.scanSegment(ctx); err != nil {
                return &model.StreamResult{Error: err}
@@ -123,11 +123,11 @@ func (t *tsResult) scanSegment(ctx context.Context) error 
{
                                                blockHeap.Push(bc)
                                        }
                                }
-                               releaseBlockScanResultBatch(batch)
                                heap.Init(blockHeap)
                                result := blockHeap.merge(t.qo.MaxElementSize)
                                t.shards[workerID].CopyFrom(tmpResult, result)
                                blockHeap.reset()
+                               releaseBlockScanResultBatch(batch)
                        }
                        workerWg.Done()
                }(i)
@@ -158,13 +158,14 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo 
queryOptions, sm *stre
        for idx, tagFamily := range bc.tagFamilies {
                tagFamilyMap[tagFamily.name] = idx + 1
        }
+       is := sm.indexSchema.Load().(indexSchema)
        for _, tagFamilyProj := range bc.tagProjection {
                for j, tagProj := range tagFamilyProj.Names {
-                       tagSpec := sm.tagMap[tagProj]
+                       tagSpec := is.tagMap[tagProj]
                        if tagSpec.IndexedOnly {
                                continue
                        }
-                       entityPos := sm.entityMap[tagProj]
+                       entityPos := is.indexRuleLocators.EntitySet[tagProj]
                        if entityPos == 0 {
                                continue
                        }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 0b27dca6..b3a4756d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -21,7 +21,7 @@ package stream
 
 import (
        "context"
-       "io"
+       "sync/atomic"
        "time"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -59,26 +59,38 @@ type Query interface {
 
 // Stream allows inspecting elements' details.
 type Stream interface {
-       io.Closer
        GetSchema() *databasev1.Stream
        GetIndexRules() []*databasev1.IndexRule
        Query(ctx context.Context, opts model.StreamQueryOptions) 
(model.StreamQueryResult, error)
 }
 
-var _ Stream = (*stream)(nil)
-
-type stream struct {
-       databaseSupplier  schema.Supplier
-       l                 *logger.Logger
-       schema            *databasev1.Stream
+type indexSchema struct {
        tagMap            map[string]*databasev1.TagSpec
-       entityMap         map[string]int
-       pm                *protector.Memory
-       name              string
-       group             string
        indexRuleLocators partition.IndexRuleLocator
        indexRules        []*databasev1.IndexRule
-       shardNum          uint32
+}
+
+func (i *indexSchema) parse(schema *databasev1.Stream) {
+       i.indexRuleLocators, _ = 
partition.ParseIndexRuleLocators(schema.GetEntity(), schema.GetTagFamilies(), 
i.indexRules, false)
+       i.tagMap = make(map[string]*databasev1.TagSpec)
+       for _, tf := range schema.GetTagFamilies() {
+               for _, tag := range tf.GetTags() {
+                       i.tagMap[tag.GetName()] = tag
+               }
+       }
+}
+
+var _ Stream = (*stream)(nil)
+
+type stream struct {
+       indexSchema atomic.Value
+       tsdb        atomic.Value
+       l           *logger.Logger
+       schema      *databasev1.Stream
+       pm          *protector.Memory
+       schemaRepo  *schemaRepo
+       name        string
+       group       string
 }
 
 func (s *stream) GetSchema() *databasev1.Stream {
@@ -86,48 +98,40 @@ func (s *stream) GetSchema() *databasev1.Stream {
 }
 
 func (s *stream) GetIndexRules() []*databasev1.IndexRule {
-       return s.indexRules
+       is := s.indexSchema.Load()
+       if is == nil {
+               return nil
+       }
+       return is.(indexSchema).indexRules
 }
 
-func (s *stream) Close() error {
-       return nil
+func (s *stream) OnIndexUpdate(index []*databasev1.IndexRule) {
+       var is indexSchema
+       is.indexRules = index
+       is.parse(s.schema)
+       s.indexSchema.Store(is)
 }
 
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.indexRuleLocators, _ = 
partition.ParseIndexRuleLocators(s.schema.GetEntity(), 
s.schema.GetTagFamilies(), s.indexRules, false)
-       s.tagMap = make(map[string]*databasev1.TagSpec)
-       for _, tf := range s.schema.GetTagFamilies() {
-               for _, tag := range tf.GetTags() {
-                       s.tagMap[tag.GetName()] = tag
-               }
-       }
-       s.entityMap = make(map[string]int)
-       for idx, entity := range s.schema.GetEntity().GetTagNames() {
-               s.entityMap[entity] = idx + 1
-       }
+       var is indexSchema
+       is.parse(s.schema)
+       s.indexSchema.Store(is)
 }
 
 type streamSpec struct {
-       schema     *databasev1.Stream
-       indexRules []*databasev1.IndexRule
+       schema *databasev1.Stream
 }
 
-func openStream(shardNum uint32, db schema.Supplier,
-       spec streamSpec, l *logger.Logger, pm *protector.Memory,
+func openStream(spec streamSpec,
+       l *logger.Logger, pm *protector.Memory, schemaRepo *schemaRepo,
 ) *stream {
        s := &stream{
-               shardNum:   shardNum,
                schema:     spec.schema,
-               indexRules: spec.indexRules,
                l:          l,
                pm:         pm,
+               schemaRepo: schemaRepo,
        }
        s.parseSpec()
-       if db == nil {
-               return s
-       }
-
-       s.databaseSupplier = db
        return s
 }
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index db995f6b..9a0664ec 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -155,10 +155,12 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        }
        et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)
 
+       is := stm.indexSchema.Load().(indexSchema)
+
        tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies))
-       if len(stm.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
+       if len(is.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
                logger.Panicf("metadata crashed, tag family rule length %d, tag 
family length %d",
-                       len(stm.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
+                       len(is.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
        var fields []index.Field
        for i := range stm.GetSchema().GetTagFamilies() {
@@ -168,7 +170,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                } else {
                        tagFamily = req.Element.TagFamilies[i]
                }
-               tfr := stm.indexRuleLocators.TagFamilyTRule[i]
+               tfr := is.indexRuleLocators.TagFamilyTRule[i]
                tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
                tf := tagValues{
                        tag: tagFamilySpec.Name,
@@ -190,7 +192,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                                        SeriesID:    series.ID,
                                }, t.Type, tagValue, r.GetNoSort())
                        }
-                       _, isEntity := 
stm.indexRuleLocators.EntitySet[tagFamilySpec.Tags[j].Name]
+                       _, isEntity := is.indexRuleLocators.EntitySet[t.Name]
                        if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
                                continue
                        }
diff --git a/bydbctl/internal/cmd/measure_test.go 
b/bydbctl/internal/cmd/measure_test.go
index 2f2c82ce..1c797f6c 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -179,7 +179,8 @@ entity:
                })
                resp := new(databasev1.MeasureRegistryServiceListResponse)
                helpers.UnmarshalYAML([]byte(out), resp)
-               Expect(resp.Measure).To(HaveLen(2))
+               // There is a _topn_result measure created by default
+               Expect(resp.Measure).To(HaveLen(3))
        })
 
        AfterEach(func() {
diff --git a/go.mod b/go.mod
index cecdc0b8..8ab8f6f8 100644
--- a/go.mod
+++ b/go.mod
@@ -2,7 +2,7 @@ module github.com/apache/skywalking-banyandb
 
 go 1.23
 
-toolchain go1.23.1
+toolchain go1.23.5
 
 require (
        github.com/RoaringBitmap/roaring v1.9.4
@@ -74,7 +74,7 @@ require (
        github.com/blugelabs/bluge_segment_api v0.2.0
        github.com/blugelabs/ice v1.0.0 // indirect
        github.com/caio/go-tdigest v3.1.0+incompatible // indirect
-       github.com/cenkalti/backoff/v4 v4.3.0 // indirect
+       github.com/cenkalti/backoff/v4 v4.3.0
        github.com/coreos/go-semver v0.3.1 // indirect
        github.com/coreos/go-systemd/v22 v22.5.0 // indirect
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // 
indirect
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index f3c69833..c60203fc 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -25,7 +25,7 @@ import (
 
 // IndexRuleLocator is a helper struct to locate the index rule by tag name.
 type IndexRuleLocator struct {
-       EntitySet      map[string]struct{}
+       EntitySet      map[string]int
        TagFamilyTRule []map[string]*databasev1.IndexRule
 }
 
@@ -42,10 +42,10 @@ type FieldIndexLocation map[string]map[string]FieldWithType
 func ParseIndexRuleLocators(entity *databasev1.Entity, families 
[]*databasev1.TagFamilySpec,
        indexRules []*databasev1.IndexRule, indexMode bool,
 ) (locators IndexRuleLocator, fil FieldIndexLocation) {
-       locators.EntitySet = make(map[string]struct{}, len(entity.TagNames))
+       locators.EntitySet = make(map[string]int, len(entity.TagNames))
        fil = make(FieldIndexLocation)
        for i := range entity.TagNames {
-               locators.EntitySet[entity.TagNames[i]] = struct{}{}
+               locators.EntitySet[entity.TagNames[i]] = i + 1
        }
        findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule {
                for i := range indexRules {
diff --git a/pkg/query/logical/measure/topn_analyzer.go 
b/pkg/query/logical/measure/topn_analyzer.go
index eed24da5..1a52f4dd 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -23,8 +23,8 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
-       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 // TopNAnalyze converts logical expressions to executable operation tree 
represented by Plan.
@@ -48,7 +48,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, 
sourceMeasureSchema *databasev
 
        if criteria.GetAgg() != 0 {
                groupByProjectionTags := 
sourceMeasureSchema.GetEntity().GetTagNames()
-               groupByTags := 
[][]*logical.Tag{logical.NewTags(pkgschema.TopNTagFamily, 
groupByProjectionTags...)}
+               groupByTags := 
[][]*logical.Tag{logical.NewTags(measure.TopNTagFamily, 
groupByProjectionTags...)}
                plan = newUnresolvedGroupBy(plan, groupByTags, false)
                plan = newUnresolvedAggregation(plan,
                        &logical.Field{Name: topNAggSchema.FieldName},
@@ -97,7 +97,7 @@ func buildVirtualSchema(sourceMeasureSchema 
*databasev1.Measure, fieldName strin
                Metadata: sourceMeasureSchema.Metadata,
                TagFamilies: []*databasev1.TagFamilySpec{
                        {
-                               Name: pkgschema.TopNTagFamily,
+                               Name: measure.TopNTagFamily,
                                Tags: tags,
                        },
                },
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index ee319af6..6eb19855 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -35,13 +35,12 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
-       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
        _               logical.UnresolvedPlan = (*unresolvedLocalScan)(nil)
-       fieldProjection                        = 
[]string{pkgschema.TopNFieldName}
+       fieldProjection                        = []string{measure.TopNFieldName}
 )
 
 type unresolvedLocalScan struct {
@@ -90,13 +89,13 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) 
(logical.Plan, error)
        return &localScan{
                s: s,
                options: model.MeasureQueryOptions{
-                       Name:      pkgschema.TopNSchemaName,
+                       Name:      measure.TopNSchemaName,
                        TimeRange: &tr,
                        Entities:  entities,
                        TagProjection: []model.TagProjection{
                                {
-                                       Family: pkgschema.TopNTagFamily,
-                                       Names:  pkgschema.TopNTagNames,
+                                       Family: measure.TopNTagFamily,
+                                       Names:  measure.TopNTagNames,
                                },
                        },
                        FieldProjection: fieldProjection,
@@ -214,7 +213,7 @@ func (ei *topNMIterator) Next() bool {
                                Version:   r.Versions[i],
                        }
                        tagFamily := &modelv1.TagFamily{
-                               Name: pkgschema.TopNTagFamily,
+                               Name: measure.TopNTagFamily,
                        }
                        dp.TagFamilies = append(dp.TagFamilies, tagFamily)
                        for k, entityName := range entityNames {
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index eda67111..d485c6ff 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -79,7 +79,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        default:
        }
        if i.result != nil {
-               return BuildElementsFromStreamResult(ctx, i.result), nil
+               return BuildElementsFromStreamResult(ctx, i.result)
        }
        var orderBy *index.OrderBy
        if i.order != nil {
@@ -104,7 +104,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        if i.result == nil {
                return nil, nil
        }
-       return BuildElementsFromStreamResult(ctx, i.result), nil
+       return BuildElementsFromStreamResult(ctx, i.result)
 }
 
 func (i *localIndexScan) String() string {
@@ -125,10 +125,19 @@ func (i *localIndexScan) Schema() logical.Schema {
 }
 
 // BuildElementsFromStreamResult builds a slice of elements from the given 
stream query result.
-func BuildElementsFromStreamResult(ctx context.Context, result 
model.StreamQueryResult) (elements []*streamv1.Element) {
-       r := result.Pull(ctx)
-       if r == nil {
-               return nil
+func BuildElementsFromStreamResult(ctx context.Context, result 
model.StreamQueryResult) (elements []*streamv1.Element, err error) {
+       var r *model.StreamResult
+       for {
+               r = result.Pull(ctx)
+               if r == nil {
+                       return nil, nil
+               }
+               if r.Error != nil {
+                       return nil, r.Error
+               }
+               if len(r.Timestamps) > 0 {
+                       break
+               }
        }
        for i := range r.Timestamps {
                e := &streamv1.Element{
@@ -150,5 +159,5 @@ func BuildElementsFromStreamResult(ctx context.Context, 
result model.StreamQuery
                }
                elements = append(elements, e)
        }
-       return elements
+       return elements, nil
 }
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index e40f2581..7a31cf1b 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -91,6 +91,44 @@ type StreamQueryOptions struct {
        MaxElementSize int
 }
 
+// Reset resets the StreamQueryOptions.
+func (s *StreamQueryOptions) Reset() {
+       s.Name = ""
+       s.TimeRange = nil
+       s.Entities = nil
+       s.Filter = nil
+       s.Order = nil
+       s.TagProjection = nil
+       s.MaxElementSize = 0
+}
+
+// CopyFrom copies the StreamQueryOptions from other to s.
+func (s *StreamQueryOptions) CopyFrom(other *StreamQueryOptions) {
+       s.Name = other.Name
+       s.TimeRange = other.TimeRange
+
+       // Deep copy for Entities if it's a slice
+       if other.Entities != nil {
+               s.Entities = make([][]*modelv1.TagValue, len(other.Entities))
+               copy(s.Entities, other.Entities)
+       } else {
+               s.Entities = nil
+       }
+
+       s.Filter = other.Filter
+       s.Order = other.Order
+
+       // Deep copy if TagProjection is a slice
+       if other.TagProjection != nil {
+               s.TagProjection = make([]TagProjection, 
len(other.TagProjection))
+               copy(s.TagProjection, other.TagProjection)
+       } else {
+               s.TagProjection = nil
+       }
+
+       s.MaxElementSize = other.MaxElementSize
+}
+
 // StreamResult is the result of a query.
 type StreamResult struct {
        Error       error
@@ -201,6 +239,8 @@ func (sr *StreamResult) CopySingleFrom(other *StreamResult) 
{
        }
 }
 
+var bypassStreamResult = &StreamResult{}
+
 // StreamResultHeap is a min-heap of StreamResult pointers.
 type StreamResultHeap struct {
        data []*StreamResult
@@ -242,6 +282,10 @@ func MergeStreamResults(results []*StreamResult, topN int, 
asc bool) *StreamResu
                }
        }
 
+       if h.Len() == 0 {
+               return bypassStreamResult
+       }
+
        mergedResult := NewStreamResult(topN, asc)
 
        for h.Len() > 0 && mergedResult.Len() < topN {
diff --git a/pkg/query/model/model_test.go b/pkg/query/model/model_test.go
index 500757b8..92247e61 100644
--- a/pkg/query/model/model_test.go
+++ b/pkg/query/model/model_test.go
@@ -560,9 +560,9 @@ func TestMergeStreamResults(t *testing.T) {
                        results:  []*StreamResult{NewStreamResult(3, true), 
NewStreamResult(3, true)},
                        topN:     3,
                        asc:      true,
-                       wantTS:   []int64{},
-                       wantTags: []TagFamily{},
-                       wantSIDs: []common.SeriesID{},
+                       wantTS:   nil,
+                       wantTags: nil,
+                       wantSIDs: nil,
                },
                {
                        name: "one with data, one empty",
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 05c59d81..0f8e9615 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -41,53 +41,24 @@ import (
 var _ Resource = (*resourceSpec)(nil)
 
 type resourceSpec struct {
-       schema       ResourceSchema
-       delegated    io.Closer
-       indexRules   []*databasev1.IndexRule
-       aggregations []*databasev1.TopNAggregation
+       schema    ResourceSchema
+       delegated IndexListener
 }
 
-func (rs *resourceSpec) Delegated() io.Closer {
+func (rs *resourceSpec) Delegated() IndexListener {
        return rs.delegated
 }
 
-func (rs *resourceSpec) Close() error {
-       return rs.delegated.Close()
-}
-
 func (rs *resourceSpec) Schema() ResourceSchema {
        return rs.schema
 }
 
-func (rs *resourceSpec) IndexRules() []*databasev1.IndexRule {
-       return rs.indexRules
-}
-
-func (rs *resourceSpec) TopN() []*databasev1.TopNAggregation {
-       return rs.aggregations
-}
-
 func (rs *resourceSpec) maxRevision() int64 {
        return rs.schema.GetMetadata().GetModRevision()
 }
 
 func (rs *resourceSpec) isNewThan(other *resourceSpec) bool {
-       if other.maxRevision() > rs.maxRevision() {
-               return false
-       }
-       if len(rs.indexRules) != len(other.indexRules) {
-               return false
-       }
-       if len(rs.aggregations) != len(other.aggregations) {
-               return false
-       }
-       if parseMaxModRevision(other.indexRules) > 
parseMaxModRevision(rs.indexRules) {
-               return false
-       }
-       if parseMaxModRevision(other.aggregations) > 
parseMaxModRevision(rs.aggregations) {
-               return false
-       }
-       return true
+       return other.maxRevision() <= rs.maxRevision()
 }
 
 const maxWorkerNum = 8
@@ -112,6 +83,9 @@ type schemaRepo struct {
        metrics                *Metrics
        groupMap               sync.Map
        resourceMap            sync.Map
+       indexRuleMap           sync.Map
+       bindingForwardMap      sync.Map
+       bindingBackwardMap     sync.Map
        workerNum              int
        resourceMutex          sync.Mutex
        groupMux               sync.Mutex
@@ -200,25 +174,50 @@ func (sr *schemaRepo) Watcher() {
                                                switch evt.Kind {
                                                case EventKindGroup:
                                                        _, err = 
sr.storeGroup(evt.Metadata.GetMetadata())
-                                               case EventKindResource:
-                                                       err = 
sr.initResource(evt.Metadata.GetMetadata())
-                                               case EventKindTopNAgg:
-                                                       topNSchema := 
evt.Metadata.(*databasev1.TopNAggregation)
-                                                       err = 
createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), 
topNSchema.GetMetadata().Group)
-                                                       if err != nil {
-                                                               break
+                                                       if errors.As(err, 
schema.ErrGRPCResourceNotFound) {
+                                                               err = nil
                                                        }
-                                                       err = 
sr.initResource(topNSchema.SourceMeasure)
+                                               case EventKindResource:
+                                                       err = 
sr.storeResource(evt.Metadata)
+                                               case EventKindIndexRule:
+                                                       indexRule := 
evt.Metadata.(*databasev1.IndexRule)
+                                                       
sr.storeIndexRule(indexRule)
+                                               case EventKindIndexRuleBinding:
+                                                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
+                                                       
sr.storeIndexRuleBinding(indexRuleBinding)
                                                }
                                        case EventDelete:
                                                switch evt.Kind {
                                                case EventKindGroup:
                                                        err = 
sr.deleteGroup(evt.Metadata.GetMetadata())
                                                case EventKindResource:
-                                                       err = 
sr.deleteResource(evt.Metadata.GetMetadata())
-                                               case EventKindTopNAgg:
-                                                       topNSchema := 
evt.Metadata.(*databasev1.TopNAggregation)
-                                                       err = 
sr.initResource(topNSchema.SourceMeasure)
+                                                       
sr.deleteResource(evt.Metadata.GetMetadata())
+                                               case EventKindIndexRule:
+                                                       key := 
getKey(evt.Metadata.GetMetadata())
+                                                       
sr.indexRuleMap.Delete(key)
+                                               case EventKindIndexRuleBinding:
+                                                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
+                                                       col, _ := 
sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
+                                                               Name:  
indexRuleBinding.Subject.GetName(),
+                                                               Group: 
indexRuleBinding.GetMetadata().GetGroup(),
+                                                       }))
+                                                       if col == nil {
+                                                               break
+                                                       }
+                                                       tMap := col.(*sync.Map)
+                                                       key := 
getKey(indexRuleBinding.GetMetadata())
+                                                       tMap.Delete(key)
+                                                       for i := range 
indexRuleBinding.Rules {
+                                                               col, _ := 
sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
+                                                                       Name:  
indexRuleBinding.Rules[i],
+                                                                       Group: 
indexRuleBinding.GetMetadata().GetGroup(),
+                                                               }))
+                                                               if col == nil {
+                                                                       continue
+                                                               }
+                                                               tMap := 
col.(*sync.Map)
+                                                               tMap.Delete(key)
+                                                       }
                                                }
                                        }
                                        if err != nil && !errors.Is(err, 
schema.ErrClosed) {
@@ -320,17 +319,13 @@ func (sr *schemaRepo) LoadResource(metadata 
*commonv1.Metadata) (Resource, bool)
        return s.(Resource), true
 }
 
-func (sr *schemaRepo) storeResource(g Group, stm ResourceSchema,
-       idxRules []*databasev1.IndexRule, topNAggrs 
[]*databasev1.TopNAggregation,
-) error {
+func (sr *schemaRepo) storeResource(resourceSchema ResourceSchema) error {
        sr.resourceMutex.Lock()
        defer sr.resourceMutex.Unlock()
        resource := &resourceSpec{
-               schema:       stm,
-               indexRules:   idxRules,
-               aggregations: topNAggrs,
+               schema: resourceSchema,
        }
-       key := getKey(stm.GetMetadata())
+       key := getKey(resourceSchema.GetMetadata())
        pre, loadedPre := sr.resourceMap.Load(key)
        var preResource *resourceSpec
        if loadedPre {
@@ -339,71 +334,119 @@ func (sr *schemaRepo) storeResource(g Group, stm 
ResourceSchema,
        if loadedPre && preResource.isNewThan(resource) {
                return nil
        }
-       var dbSupplier Supplier
-       if !g.(*group).isPortable() {
-               dbSupplier = g
-       }
-       sm, err := 
sr.resourceSchemaSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum,
 dbSupplier, resource)
+       sm, err := sr.resourceSchemaSupplier.OpenResource(resource)
        if err != nil {
                return errors.WithMessage(err, "fails to open the resource")
        }
+       sm.OnIndexUpdate(sr.indexRules(resourceSchema))
        resource.delegated = sm
        sr.resourceMap.Store(key, resource)
-       if loadedPre {
-               return preResource.Close()
-       }
        return nil
 }
 
-func getKey(metadata *commonv1.Metadata) string {
-       return path.Join(metadata.GetGroup(), metadata.GetName())
+func (sr *schemaRepo) storeIndexRule(indexRule *databasev1.IndexRule) {
+       key := getKey(indexRule.GetMetadata())
+       if prev, loaded := sr.indexRuleMap.LoadOrStore(key, indexRule); loaded {
+               if prev.(*databasev1.IndexRule).GetMetadata().ModRevision <= 
indexRule.GetMetadata().ModRevision {
+                       sr.indexRuleMap.Store(key, indexRule)
+                       if col, _ := sr.bindingBackwardMap.Load(key); col != 
nil {
+                               col.(*sync.Map).Range(func(_, value any) bool {
+                                       
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
+                                       return true
+                               })
+                       }
+               }
+       } else {
+               if col, _ := sr.bindingBackwardMap.Load(key); col != nil {
+                       col.(*sync.Map).Range(func(_, value any) bool {
+                               
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
+                               return true
+                       })
+               }
+       }
 }
 
-func (sr *schemaRepo) initResource(metadata *commonv1.Metadata) error {
-       g, ok := sr.LoadGroup(metadata.Group)
-       if !ok {
-               var err error
-               if g, err = sr.storeGroup(&commonv1.Metadata{Name: 
metadata.Group}); err != nil {
-                       return errors.WithMessagef(err, "create unknown 
group:%s", metadata.Group)
+func (sr *schemaRepo) storeIndexRuleBinding(indexRuleBinding 
*databasev1.IndexRuleBinding) {
+       var changed bool
+       col, _ := sr.bindingForwardMap.LoadOrStore(getKey(&commonv1.Metadata{
+               Name:  indexRuleBinding.Subject.GetName(),
+               Group: indexRuleBinding.GetMetadata().GetGroup(),
+       }), &sync.Map{})
+       tMap := col.(*sync.Map)
+       key := getKey(indexRuleBinding.GetMetadata())
+       if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); loaded {
+               if 
prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= 
indexRuleBinding.GetMetadata().ModRevision {
+                       tMap.Store(key, indexRuleBinding)
+                       changed = true
                }
-       }
-       stm, err := sr.resourceSchemaSupplier.ResourceSchema(metadata)
-       if err != nil {
-               if errors.Is(err, schema.ErrGRPCResourceNotFound) {
-                       if dl := sr.l.Debug(); dl.Enabled() {
-                               dl.Interface("metadata", 
metadata).Msg("resource not found")
+       } else {
+               changed = true
+       }
+       for i := range indexRuleBinding.Rules {
+               col, _ := 
sr.bindingBackwardMap.LoadOrStore(getKey(&commonv1.Metadata{
+                       Name:  indexRuleBinding.Rules[i],
+                       Group: indexRuleBinding.GetMetadata().GetGroup(),
+               }), &sync.Map{})
+               tMap := col.(*sync.Map)
+               key := getKey(indexRuleBinding.GetMetadata())
+               if prev, loaded := tMap.LoadOrStore(key, indexRuleBinding); 
loaded {
+                       if 
prev.(*databasev1.IndexRuleBinding).GetMetadata().ModRevision <= 
indexRuleBinding.GetMetadata().ModRevision {
+                               tMap.Store(key, indexRuleBinding)
+                               changed = true
                        }
-                       return nil
+               } else {
+                       changed = true
                }
-               return errors.WithMessage(err, "fails to get the resource")
        }
-       ctx := context.Background()
-       localCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
-       defer cancel()
-       idxRules, err := sr.metadata.IndexRules(localCtx, stm.GetMetadata())
-       if err != nil {
-               return err
+       if !changed {
+               return
        }
-       var topNAggrs []*databasev1.TopNAggregation
-       if _, ok := stm.(*databasev1.Measure); ok {
-               localCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
-               var innerErr error
-               topNAggrs, innerErr = 
sr.metadata.MeasureRegistry().TopNAggregations(localCtx, stm.GetMetadata())
-               cancel()
-               if innerErr != nil {
-                       return errors.WithMessage(innerErr, "fails to get the 
topN aggregations")
-               }
+       sr.updateIndex(indexRuleBinding)
+}
+
+func (sr *schemaRepo) updateIndex(binding *databasev1.IndexRuleBinding) {
+       if r, ok := sr.LoadResource(&commonv1.Metadata{
+               Name:  binding.Subject.GetName(),
+               Group: binding.GetMetadata().GetGroup(),
+       }); ok {
+               r.Delegated().OnIndexUpdate(sr.indexRules(r.Schema()))
        }
-       return sr.storeResource(g, stm, idxRules, topNAggrs)
 }
 
-func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error {
-       key := getKey(metadata)
-       pre, loaded := sr.resourceMap.LoadAndDelete(key)
-       if !loaded {
+func (sr *schemaRepo) indexRules(schema ResourceSchema) 
[]*databasev1.IndexRule {
+       n := schema.GetMetadata().GetName()
+       g := schema.GetMetadata().GetGroup()
+       col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
+               Name:  n,
+               Group: g,
+       }))
+       if col == nil {
                return nil
        }
-       return pre.(Resource).Close()
+       tMap := col.(*sync.Map)
+       var indexRules []*databasev1.IndexRule
+       tMap.Range(func(_, value any) bool {
+               indexRuleBinding := value.(*databasev1.IndexRuleBinding)
+               for i := range indexRuleBinding.Rules {
+                       if r, _ := 
sr.indexRuleMap.Load(getKey(&commonv1.Metadata{
+                               Name:  indexRuleBinding.Rules[i],
+                               Group: g,
+                       })); r != nil {
+                               indexRules = append(indexRules, 
r.(*databasev1.IndexRule))
+                       }
+               }
+               return true
+       })
+       return indexRules
+}
+
+func getKey(metadata *commonv1.Metadata) string {
+       return path.Join(metadata.GetGroup(), metadata.GetName())
+}
+
+func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) {
+       key := getKey(metadata)
+       _, _ = sr.resourceMap.LoadAndDelete(key)
 }
 
 func (sr *schemaRepo) Close() {
@@ -415,23 +458,6 @@ func (sr *schemaRepo) Close() {
        sr.closer.CloseThenWait()
        close(sr.eventCh)
 
-       sr.resourceMutex.Lock()
-       sr.resourceMap.Range(func(_, value any) bool {
-               if value == nil {
-                       return true
-               }
-               r, ok := value.(*resourceSpec)
-               if !ok {
-                       return true
-               }
-               err := r.Close()
-               if err != nil {
-                       sr.l.Err(err).RawJSON("resource", 
logger.Proto(r.Schema().GetMetadata())).Msg("closing")
-               }
-               return true
-       })
-       sr.resourceMutex.Unlock()
-
        sr.groupMux.Lock()
        defer sr.groupMux.Unlock()
        sr.groupMap.Range(func(_, value any) bool {
@@ -523,13 +549,3 @@ func (g *group) close() (err error) {
        }
        return multierr.Append(err, g.SupplyTSDB().Close())
 }
-
-func parseMaxModRevision[T ResourceSchema](indexRules []T) 
(maxRevisionForIdxRules int64) {
-       maxRevisionForIdxRules = int64(0)
-       for _, idxRule := range indexRules {
-               if idxRule.GetMetadata().GetModRevision() > 
maxRevisionForIdxRules {
-                       maxRevisionForIdxRules = 
idxRule.GetMetadata().GetModRevision()
-               }
-       }
-       return
-}
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 5050e4bc..8cbe0a12 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -22,32 +22,12 @@ import (
        "fmt"
        "time"
 
-       "github.com/pkg/errors"
-
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-const (
-       // TopNTagFamily is the tag family name of the topN result measure.
-       TopNTagFamily = "_topN"
-       // TopNFieldName is the field name of the topN result measure.
-       TopNFieldName = "value"
-)
-
-var (
-       initTimeout    = 10 * time.Second
-       topNFieldsSpec = []*databasev1.FieldSpec{{
-               Name:              TopNFieldName,
-               FieldType:         databasev1.FieldType_FIELD_TYPE_DATA_BINARY,
-               EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
-               CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
-       }}
-       // TopNTagNames is the tag names of the topN result measure.
-       TopNTagNames = []string{"name", "direction", "group"}
-)
+var initTimeout = 10 * time.Second
 
 type revisionContext struct {
        group            int64
@@ -67,18 +47,19 @@ type revisionContextKey struct{}
 
 var revCtxKey = revisionContextKey{}
 
-func (sr *schemaRepo) Init(kind schema.Kind) []int64 {
+func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) {
        if kind != schema.KindMeasure && kind != schema.KindStream {
-               return nil
+               return nil, nil
        }
        catalog := sr.getCatalog(kind)
        ctx := context.Background()
        groups, err := sr.metadata.GroupRegistry().ListGroup(ctx)
        if err != nil {
                logger.Panicf("fails to get the groups: %v", err)
-               return nil
+               return nil, nil
        }
        var revCtx revisionContext
+       groupNames := make([]string, 0, len(groups))
        for _, g := range groups {
                if g.Catalog != catalog {
                        continue
@@ -87,13 +68,14 @@ func (sr *schemaRepo) Init(kind schema.Kind) []int64 {
                        revCtx.group = g.Metadata.ModRevision
                }
                sr.processGroup(context.WithValue(ctx, revCtxKey, &revCtx), g, 
catalog)
+               groupNames = append(groupNames, g.Metadata.Name)
        }
        if kind == schema.KindMeasure {
                sr.l.Info().Stringer("revision", revCtx).Msg("init measures")
-               return []int64{revCtx.group, revCtx.measure, 
revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg}
+               return groupNames, []int64{revCtx.group, revCtx.measure, 
revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg}
        }
        sr.l.Info().Stringer("revision", revCtx).Msg("init stream")
-       return []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, 
revCtx.indexRule}
+       return groupNames, []int64{revCtx.group, revCtx.stream, 
revCtx.indexRuleBinding, revCtx.indexRule}
 }
 
 func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog {
@@ -104,20 +86,20 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) 
commonv1.Catalog {
 }
 
 func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, 
catalog commonv1.Catalog) {
-       group, err := sr.initGroup(g)
+       _, err := sr.initGroup(g)
        if err != nil {
                logger.Panicf("fails to init the group: %v", err)
        }
-       bindings := sr.getBindings(ctx, g.Metadata.GetName())
-       rules := sr.getRules(ctx, g.Metadata.GetName())
+       sr.processRules(ctx, g.Metadata.GetName())
+       sr.processBindings(ctx, g.Metadata.GetName())
        if catalog == commonv1.Catalog_CATALOG_MEASURE {
-               sr.processMeasure(ctx, g.Metadata.Name, group, bindings, rules)
+               sr.processMeasure(ctx, g.Metadata.Name)
                return
        }
-       sr.processStream(ctx, g.Metadata.Name, group, bindings, rules)
+       sr.processStream(ctx, g.Metadata.Name)
 }
 
-func (sr *schemaRepo) getBindings(ctx context.Context, gName string) 
map[string][]string {
+func (sr *schemaRepo) processBindings(ctx context.Context, gName string) {
        ctx, cancel := context.WithTimeout(ctx, initTimeout)
        defer cancel()
        start := time.Now()
@@ -126,18 +108,16 @@ func (sr *schemaRepo) getBindings(ctx context.Context, 
gName string) map[string]
                logger.Panicf("fails to get the index rule bindings: %v", err)
        }
        revCtx := ctx.Value(revCtxKey).(*revisionContext)
-       bindings := make(map[string][]string, len(ibb))
        for _, ib := range ibb {
-               bindings[ib.GetSubject().GetName()] = ib.GetRules()
+               sr.storeIndexRuleBinding(ib)
                if ib.Metadata.ModRevision > revCtx.indexRuleBinding {
                        revCtx.indexRuleBinding = ib.Metadata.ModRevision
                }
        }
-       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(bindings)).Msg("get index rule bindings")
-       return bindings
+       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(ibb)).Msg("get index rule bindings")
 }
 
-func (sr *schemaRepo) getRules(ctx context.Context, gName string) 
map[string]*databasev1.IndexRule {
+func (sr *schemaRepo) processRules(ctx context.Context, gName string) {
        ctx, cancel := context.WithTimeout(ctx, initTimeout)
        defer cancel()
        start := time.Now()
@@ -146,26 +126,16 @@ func (sr *schemaRepo) getRules(ctx context.Context, gName 
string) map[string]*da
                logger.Panicf("fails to get the index rules: %v", err)
        }
        revCtx := ctx.Value(revCtxKey).(*revisionContext)
-       rules := make(map[string]*databasev1.IndexRule, len(rr))
        for _, r := range rr {
-               rules[r.Metadata.Name] = r
+               sr.storeIndexRule(r)
                if r.Metadata.ModRevision > revCtx.indexRule {
                        revCtx.indexRule = r.Metadata.ModRevision
                }
        }
-       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(rules)).Msg("get index rules")
-       return rules
+       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(rr)).Msg("get index rules")
 }
 
-func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group 
*group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) {
-       aggMap := sr.getAggMap(ctx, gName)
-       if len(aggMap) > 0 {
-               if err := createTopNResultMeasure(ctx, 
sr.metadata.MeasureRegistry(), gName); err != nil {
-                       logger.Panicf("fails to create the topN result measure: 
%v", err)
-                       return
-               }
-       }
-
+func (sr *schemaRepo) processMeasure(ctx context.Context, gName string) {
        ctx, cancel := context.WithTimeout(ctx, initTimeout)
        defer cancel()
        start := time.Now()
@@ -180,58 +150,14 @@ func (sr *schemaRepo) processMeasure(ctx context.Context, 
gName string, group *g
                if m.Metadata.ModRevision > revCtx.measure {
                        revCtx.measure = m.Metadata.ModRevision
                }
-               sr.storeMeasure(m, group, bindings, rules, aggMap)
-       }
-       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(mm)).Msg("store measures")
-}
-
-func (sr *schemaRepo) getAggMap(ctx context.Context, gName string) 
map[string][]*databasev1.TopNAggregation {
-       ctx, cancel := context.WithTimeout(ctx, initTimeout)
-       defer cancel()
-       start := time.Now()
-       agg, err := 
sr.metadata.TopNAggregationRegistry().ListTopNAggregation(ctx, 
schema.ListOpt{Group: gName})
-       if err != nil {
-               logger.Panicf("fails to get the topN aggregations: %v", err)
-       }
-       revCtx := ctx.Value(revCtxKey).(*revisionContext)
-       aggMap := make(map[string][]*databasev1.TopNAggregation, len(agg))
-       for _, a := range agg {
-               aggs, ok := aggMap[a.SourceMeasure.Name]
-               if ok {
-                       aggs = append(aggs, a)
-               } else {
-                       aggs = []*databasev1.TopNAggregation{a}
-               }
-               aggMap[a.SourceMeasure.Name] = aggs
-               if a.Metadata.ModRevision > revCtx.topNAgg {
-                       revCtx.topNAgg = a.Metadata.ModRevision
-               }
-       }
-       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(agg)).Msg("get topN aggregations")
-       return aggMap
-}
-
-func (sr *schemaRepo) storeMeasure(m *databasev1.Measure, group *group, 
bindings map[string][]string,
-       rules map[string]*databasev1.IndexRule, aggMap 
map[string][]*databasev1.TopNAggregation,
-) {
-       var indexRules []*databasev1.IndexRule
-       if rr, ok := bindings[m.Metadata.GetName()]; ok {
-               for _, r := range rr {
-                       if rule, ok := rules[r]; ok {
-                               indexRules = append(indexRules, rule)
-                       }
+               if err := sr.storeResource(m); err != nil {
+                       logger.Panicf("fails to store the measure: %v", err)
                }
        }
-       var topNAggr []*databasev1.TopNAggregation
-       if aa, ok := aggMap[m.Metadata.GetName()]; ok {
-               topNAggr = append(topNAggr, aa...)
-       }
-       if err := sr.storeResource(group, m, indexRules, topNAggr); err != nil {
-               logger.Panicf("fails to store the measure: %v", err)
-       }
+       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(mm)).Msg("store measures")
 }
 
-func (sr *schemaRepo) processStream(ctx context.Context, gName string, group 
*group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) {
+func (sr *schemaRepo) processStream(ctx context.Context, gName string) {
        ctx, cancel := context.WithTimeout(ctx, initTimeout)
        defer cancel()
        start := time.Now()
@@ -242,7 +168,9 @@ func (sr *schemaRepo) processStream(ctx context.Context, 
gName string, group *gr
        }
        revCtx := ctx.Value(revCtxKey).(*revisionContext)
        for _, s := range ss {
-               sr.storeStream(s, group, bindings, rules)
+               if err := sr.storeResource(s); err != nil {
+                       logger.Panicf("fails to store the stream: %v", err)
+               }
                if s.Metadata.ModRevision > revCtx.stream {
                        revCtx.stream = s.Metadata.ModRevision
                }
@@ -250,20 +178,6 @@ func (sr *schemaRepo) processStream(ctx context.Context, 
gName string, group *gr
        sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(ss)).Msg("store streams")
 }
 
-func (sr *schemaRepo) storeStream(s *databasev1.Stream, group *group, bindings 
map[string][]string, rules map[string]*databasev1.IndexRule) {
-       var indexRules []*databasev1.IndexRule
-       if rr, ok := bindings[s.Metadata.GetName()]; ok {
-               for _, r := range rr {
-                       if rule, ok := rules[r]; ok {
-                               indexRules = append(indexRules, rule)
-                       }
-               }
-       }
-       if err := sr.storeResource(group, s, indexRules, nil); err != nil {
-               logger.Panicf("fails to store the stream: %v", err)
-       }
-}
-
 func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) {
        g, ok := sr.getGroup(groupSchema.Metadata.Name)
        if ok {
@@ -276,51 +190,3 @@ func (sr *schemaRepo) initGroup(groupSchema 
*commonv1.Group) (*group, error) {
        }
        return g, nil
 }
-
-func createTopNResultMeasure(ctx context.Context, measureSchemaRegistry 
schema.Measure, group string) error {
-       md := GetTopNSchemaMetadata(group)
-       m, err := measureSchemaRegistry.GetMeasure(ctx, md)
-       if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
-               return errors.WithMessagef(err, "fail to get %s", md)
-       }
-       if m != nil {
-               return nil
-       }
-
-       m = GetTopNSchema(md)
-       if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr 
!= nil {
-               if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
-                       return errors.WithMessagef(innerErr, "fail to create 
new topN measure %s", m)
-               }
-       }
-       return nil
-}
-
-// GetTopNSchema returns the schema of the topN result measure.
-func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure {
-       return &databasev1.Measure{
-               Metadata: md,
-               TagFamilies: []*databasev1.TagFamilySpec{
-                       {
-                               Name: TopNTagFamily,
-                               Tags: []*databasev1.TagSpec{
-                                       {Name: TopNTagNames[0], Type: 
databasev1.TagType_TAG_TYPE_STRING},
-                                       {Name: TopNTagNames[1], Type: 
databasev1.TagType_TAG_TYPE_INT},
-                                       {Name: TopNTagNames[2], Type: 
databasev1.TagType_TAG_TYPE_STRING},
-                               },
-                       },
-               },
-               Fields: topNFieldsSpec,
-               Entity: &databasev1.Entity{
-                       TagNames: TopNTagNames,
-               },
-       }
-}
-
-// GetTopNSchemaMetadata returns the metadata of the topN result measure.
-func GetTopNSchemaMetadata(group string) *commonv1.Metadata {
-       return &commonv1.Metadata{
-               Name:  TopNSchemaName,
-               Group: group,
-       }
-}
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 9415539d..50c455b9 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -26,9 +26,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
-// TopNSchemaName is the name of the top n result schema.
-const TopNSchemaName = "_top_n_result"
-
 // EventType defines actions of events.
 type EventType uint8
 
@@ -46,7 +43,8 @@ type EventKind uint8
 const (
        EventKindGroup EventKind = iota
        EventKindResource
-       EventKindTopNAgg
+       EventKindIndexRule
+       EventKindIndexRuleBinding
 )
 
 // Group is the root node, allowing get resources from its sub nodes.
@@ -69,11 +67,8 @@ type ResourceSchema interface {
 
 // Resource allows access metadata from a local cache.
 type Resource interface {
-       io.Closer
-       IndexRules() []*databasev1.IndexRule
-       TopN() []*databasev1.TopNAggregation
        Schema() ResourceSchema
-       Delegated() io.Closer
+       Delegated() IndexListener
 }
 
 // Supplier allows open a tsdb.
@@ -81,10 +76,15 @@ type Supplier interface {
        SupplyTSDB() io.Closer
 }
 
+// IndexListener listens to the index update.
+type IndexListener interface {
+       OnIndexUpdate(index []*databasev1.IndexRule)
+}
+
 // ResourceSchemaSupplier allows get a ResourceSchema from the metadata.
 type ResourceSchemaSupplier interface {
        ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error)
-       OpenResource(shardNum uint32, supplier Supplier, spec Resource) 
(io.Closer, error)
+       OpenResource(spec Resource) (IndexListener, error)
 }
 
 // ResourceSupplier allows open a resource and its embedded tsdb.
@@ -102,7 +102,7 @@ type DB interface {
 // Repository is the collection of several hierarchies groups by a "Group".
 type Repository interface {
        Watcher()
-       Init(schema.Kind) []int64
+       Init(schema.Kind) ([]string, []int64)
        SendMetadataEvent(MetadataEvent)
        LoadGroup(name string) (Group, bool)
        LoadResource(metadata *commonv1.Metadata) (Resource, bool)
diff --git a/scripts/ci/check/version_test.go b/scripts/ci/check/version_test.go
index 26555a59..a5855cb5 100644
--- a/scripts/ci/check/version_test.go
+++ b/scripts/ci/check/version_test.go
@@ -31,7 +31,7 @@ import (
 )
 
 const (
-       GoVersion = "1.23.0"
+       GoVersion = "1.23.5"
        CPUType   = 8
 )
 
diff --git a/test/integration/standalone/cold_query/query_suite_test.go 
b/test/integration/standalone/cold_query/query_suite_test.go
index a28c145e..36f03901 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -89,7 +89,9 @@ var _ = SynchronizedAfterSuite(func() {
                Expect(connection.Close()).To(Succeed())
        }
 }, func() {
-       deferFunc()
+       if deferFunc != nil {
+               deferFunc()
+       }
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
        Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index 765cd893..664f4071 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -90,7 +90,9 @@ var _ = SynchronizedAfterSuite(func() {
                Expect(connection.Close()).To(Succeed())
        }
 }, func() {
-       deferFunc()
+       if deferFunc != nil {
+               deferFunc()
+       }
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
        Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })

Reply via email to