This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e1ba421bd Support client watch the schema server updates (#995)
e1ba421bd is described below

commit e1ba421bd624727760c7a69c84c6fe55878fb526
Author: mrproliu <[email protected]>
AuthorDate: Tue Mar 10 16:18:16 2026 +0800

    Support client watch the schema server updates (#995)
    
    * Support client watch the schema server updates
---
 api/proto/banyandb/schema/v1/internal.proto        |  26 +-
 banyand/metadata/schema/property/client.go         | 657 +++++++++++----------
 banyand/metadata/schema/property/client_test.go    | 511 +++++++++++-----
 banyand/metadata/schema/property/converter.go      |  67 ++-
 banyand/metadata/schema/property/converter_test.go |  35 +-
 banyand/metadata/schema/property/schema_client.go  |   9 +-
 banyand/metadata/schema/schemaserver/grpc.go       | 164 ++++-
 banyand/metadata/schema/schemaserver/grpc_test.go  | 169 +++++-
 banyand/metadata/schema/schemaserver/service.go    |  27 +-
 banyand/metadata/schema/schemaserver/watcher.go    | 105 ++++
 .../metadata/schema/schemaserver/watcher_test.go   | 115 ++++
 docs/api-reference.md                              |  87 +--
 pkg/test/setup/setup.go                            |   6 +-
 .../cases/cluster/docker-compose-property.yml      |   4 +-
 .../event/banyandb/docker-compose-property.yml     |   2 +-
 .../trace/banyandb/docker-compose-property.yml     |   2 +-
 .../storage/banyandb/docker-compose-property.yml   |   2 +-
 .../distributed/lifecycle/property/suite_test.go   |   1 -
 18 files changed, 1389 insertions(+), 600 deletions(-)

diff --git a/api/proto/banyandb/schema/v1/internal.proto 
b/api/proto/banyandb/schema/v1/internal.proto
index f753411ff..9d5908d1c 100644
--- a/api/proto/banyandb/schema/v1/internal.proto
+++ b/api/proto/banyandb/schema/v1/internal.proto
@@ -19,6 +19,7 @@ syntax = "proto3";
 
 package banyandb.schema.v1;
 
+import "banyandb/model/v1/query.proto";
 import "banyandb/property/v1/property.proto";
 import "banyandb/property/v1/rpc.proto";
 import "google/protobuf/timestamp.proto";
@@ -74,15 +75,26 @@ service SchemaManagementService {
   rpc RepairSchema(RepairSchemaRequest) returns (RepairSchemaResponse);
 }
 
-message AggregateSchemaUpdatesRequest {
-  property.v1.QueryRequest query = 1;
+service SchemaUpdateService {
+  rpc WatchSchemas(stream WatchSchemasRequest) returns (stream 
WatchSchemasResponse);
 }
 
-message AggregateSchemaUpdatesResponse {
-  // which names/schemas has updates
-  repeated string names = 1;
+enum SchemaEventType {
+  SCHEMA_EVENT_TYPE_UNSPECIFIED = 0;
+  SCHEMA_EVENT_TYPE_INSERT = 1;
+  SCHEMA_EVENT_TYPE_UPDATE = 2;
+  SCHEMA_EVENT_TYPE_DELETE = 3;
+  SCHEMA_EVENT_TYPE_REPLAY_DONE = 4;
 }
 
-service SchemaUpdateService {
-  rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns 
(AggregateSchemaUpdatesResponse);
+message WatchSchemasRequest {
+  model.v1.Criteria criteria = 1;
+  repeated string tag_projection = 2;
+}
+
+message WatchSchemasResponse {
+  SchemaEventType event_type = 1;
+  property.v1.Property property = 2;
+  bool metadata_only = 3;
+  int64 delete_time = 4;
 }
diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index 733715de9..19b6d42fe 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -34,6 +34,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
        "github.com/apache/skywalking-banyandb/api/validate"
@@ -73,6 +74,29 @@ type ClientConfig struct {
        TLSEnabled         bool
 }
 
+type syncRequest struct {
+       criteria      *modelv1.Criteria
+       tagProjection []string
+}
+
+type watchSession struct {
+       cancelFn  context.CancelFunc
+       syncReqCh chan *syncRequest
+       syncResCh chan []*digestEntry
+}
+
+type digestEntry struct {
+       propID     string
+       kind       string
+       group      string
+       name       string
+       revision   int64
+       deleteTime int64
+}
+
+// DefaultFullReconcileEvery is the default number of sync rounds between full 
reconciliations.
+const DefaultFullReconcileEvery = 5
+
 // SchemaRegistry implements schema.Registry using property-based schema 
servers.
 type SchemaRegistry struct {
        connMgr            *grpchelper.ConnManager[*schemaClient]
@@ -81,11 +105,12 @@ type SchemaRegistry struct {
        cache              *schemaCache
        caCertReloader     *pkgtls.Reloader
        handlers           map[schema.Kind][]schema.EventHandler
+       watchSessions      map[string]*watchSession
        syncInterval       time.Duration
        fullReconcileEvery uint64
        syncRound          uint64
-       started            atomic.Bool
        mux                sync.RWMutex
+       watchMu            sync.Mutex
 }
 
 // NewSchemaRegistryClient creates a new property-based schema registry client.
@@ -118,8 +143,8 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                initWaitTime = DefaultInitWaitTime
        }
        fullReconcileEvery := cfg.FullReconcileEvery
-       if fullReconcileEvery == 0 {
-               fullReconcileEvery = 5
+       if fullReconcileEvery <= 0 {
+               fullReconcileEvery = DefaultFullReconcileEvery
        }
        reg := &SchemaRegistry{
                connMgr:            connMgr,
@@ -128,9 +153,11 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                cache:              newSchemaCache(),
                caCertReloader:     caCertReloader,
                handlers:           make(map[schema.Kind][]schema.EventHandler),
+               watchSessions:      make(map[string]*watchSession),
                syncInterval:       syncInterval,
                fullReconcileEvery: fullReconcileEvery,
        }
+       handler.registry = reg
 
        if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
                connMgr.OnAddOrUpdate(cfg.CurNode)
@@ -258,64 +285,11 @@ func (r *SchemaRegistry) broadcastInsert(ctx 
context.Context, prop *propertyv1.P
        return nil
 }
 
-func (r *SchemaRegistry) forEachActiveNode(fn func(nodeName string, c 
*schemaClient) error, errMsg string) {
-       names := r.connMgr.ActiveNames()
-       for _, nodeName := range names {
-               currentNode := nodeName
-               execErr := r.connMgr.Execute(currentNode, func(c *schemaClient) 
error {
-                       return fn(currentNode, c)
-               })
-               if execErr != nil {
-                       r.l.Warn().Err(execErr).Str("node", 
currentNode).Msg(errMsg)
-               }
-       }
-}
-
 type schemaWithDeleteTime struct {
        property   *propertyv1.Property
        deleteTime int64
 }
 
-type syncCollector struct {
-       entries        map[schema.Kind]map[string]*schemaWithDeleteTime
-       queriedServers map[schema.Kind]map[string]bool
-       mu             sync.Mutex
-}
-
-func newSyncCollector() *syncCollector {
-       return &syncCollector{
-               entries:        
make(map[schema.Kind]map[string]*schemaWithDeleteTime),
-               queriedServers: make(map[schema.Kind]map[string]bool),
-       }
-}
-
-func (c *syncCollector) add(serverName string, kind schema.Kind, schemas 
[]*schemaWithDeleteTime) {
-       c.mu.Lock()
-       defer c.mu.Unlock()
-       if c.entries[kind] == nil {
-               c.entries[kind] = make(map[string]*schemaWithDeleteTime)
-       }
-       if c.queriedServers[kind] == nil {
-               c.queriedServers[kind] = make(map[string]bool)
-       }
-       c.queriedServers[kind][serverName] = true
-       for _, s := range schemas {
-               propID := s.property.GetId()
-               existing, exists := c.entries[kind][propID]
-               if !exists {
-                       c.entries[kind][propID] = s
-                       continue
-               }
-               existingRev := ParseTags(existing.property.GetTags()).UpdatedAt
-               newRev := ParseTags(s.property.GetTags()).UpdatedAt
-               // Prefer higher revision. If same revision, prefer non-deleted 
over deleted.
-               if newRev > existingRev ||
-                       newRev == existingRev && existing.deleteTime > 0 && 
s.deleteTime == 0 {
-                       c.entries[kind][propID] = s
-               }
-       }
-}
-
 type propInfo struct {
        nodeRev     map[string]int64
        nodeDelTime map[string]int64
@@ -443,23 +417,9 @@ func (r *SchemaRegistry) getSchema(ctx context.Context, 
kind schema.Kind,
        }
        propID := query.Ids[0]
        info := propMap[propID]
-       if !r.started.Load() {
-               if info == nil || info.best == nil || info.best.deleteTime > 0 {
-                       return nil, nil
-               }
-               return info.best.property, nil
-       }
        if info == nil || info.best == nil || info.best.deleteTime > 0 {
-               entry := r.cache.Get(propID)
-               if entry != nil {
-                       r.handleDeletion(kind, propID, entry, 
entry.latestUpdateAt)
-               }
                return nil, nil
        }
-       md, convErr := ToSchema(kind, info.best.property)
-       if convErr == nil {
-               r.processInitialResourceFromProperty(kind, info.best.property, 
md.Spec.(proto.Message))
-       }
        return info.best.property, nil
 }
 
@@ -471,39 +431,10 @@ func (r *SchemaRegistry) listSchemas(ctx context.Context, 
kind schema.Kind,
        if queryErr != nil {
                return nil, queryErr
        }
-       if !r.started.Load() {
-               result := make([]*propertyv1.Property, 0, len(propMap))
-               for _, info := range propMap {
-                       if info.best != nil && info.best.deleteTime == 0 {
-                               result = append(result, info.best.property)
-                       }
-               }
-               return result, nil
-       }
-       serverPropIDs := make(map[string]struct{})
        result := make([]*propertyv1.Property, 0, len(propMap))
-       for propID, info := range propMap {
-               if info.best == nil || info.best.deleteTime != 0 {
-                       entry := r.cache.Get(propID)
-                       if entry != nil {
-                               r.handleDeletion(kind, propID, entry, 
entry.latestUpdateAt)
-                       }
-                       continue
-               }
-               serverPropIDs[propID] = struct{}{}
-               md, convErr := ToSchema(kind, info.best.property)
-               if convErr == nil {
-                       r.processInitialResourceFromProperty(kind, 
info.best.property, md.Spec.(proto.Message))
-               }
-               result = append(result, info.best.property)
-       }
-       cachedEntries := r.cache.GetEntriesByKind(kind)
-       for propID, entry := range cachedEntries {
-               if group != "" && entry.group != group {
-                       continue
-               }
-               if _, found := serverPropIDs[propID]; !found {
-                       r.handleDeletion(kind, propID, entry, 
entry.latestUpdateAt)
+       for _, info := range propMap {
+               if info.best != nil && info.best.deleteTime == 0 {
+                       result = append(result, info.best.property)
                }
        }
        return result, nil
@@ -1041,10 +972,10 @@ func (r *SchemaRegistry) Start(ctx context.Context) 
error {
                        }
                }
        }
-       r.forEachActiveNode(func(_ string, sc *schemaClient) error {
-               return r.initializeFromSchemaClient(ctx, sc)
-       }, "failed to initialize from schema client at startup")
-       r.started.Store(true)
+       // Replay all cached entries to handlers. Watch replay (launched during
+       // OnActive in PreRun) may have populated the cache before handlers were
+       // registered. This explicit replay ensures handlers see every entry.
+       r.notifyHandlersFromCache()
        go r.syncLoop(ctx)
        return nil
 }
@@ -1063,260 +994,349 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
                case <-r.closer.CloseNotify():
                        return
                case <-ticker.C:
-                       r.performSync(ctx)
+                       r.syncRound++
+                       if r.syncRound%r.fullReconcileEvery == 0 {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+                               r.performFullSync(ctx)
+                       } else {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+                               r.performIncrementalSync(ctx)
+                       }
                }
        }
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-       round := atomic.AddUint64(&r.syncRound, 1)
-       isFullReconcile := r.shouldFullReconcile(round)
-       names := r.connMgr.ActiveNames()
-       if len(names) == 0 {
-               r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-               return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if old, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+               old.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       ctx, cancel := context.WithCancel(r.closer.Ctx())
+       session := &watchSession{
+               cancelFn:  cancel,
+               syncReqCh: make(chan *syncRequest, 1),
+               syncResCh: make(chan []*digestEntry, 1),
        }
-       r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-               Int("activeNodes", len(names)).Msg("performSync: starting")
-       collector := newSyncCollector()
-       if isFullReconcile {
-               r.collectFullSync(ctx, collector)
+       r.watchSessions[nodeName] = session
+       r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(ctx, nodeName, client.update, session)
+               }()
        } else {
-               r.collectIncrementalSync(ctx, collector)
+               cancel()
+               delete(r.watchSessions, nodeName)
        }
-       r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-       r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-       kindsToSync := r.getKindsToSync()
-       if len(kindsToSync) == 0 {
-               return
-       }
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               for _, kind := range kindsToSync {
-                       query := buildSchemaQuery(kind, "", "", 0)
-                       schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if queryErr != nil {
-                               return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-                       }
-                       if r.l.Debug().Enabled() {
-                               activeSchemas := 0
-                               deletedSchemas := 0
-                               for _, s := range schemas {
-                                       if s.deleteTime > 0 {
-                                               deletedSchemas++
-                                               continue
-                                       }
-                                       activeSchemas++
-                               }
-                               r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                                       Int("schemas", len(schemas)).
-                                       Int("activeSchemas", activeSchemas).
-                                       Int("deletedSchemas", deletedSchemas).
-                                       Msg("collectFullSync: collected")
-                       }
-                       collector.add(nodeName, kind, schemas)
-               }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if session, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling 
watch session")
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
        }
 }
 
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector 
*syncCollector) {
-       sinceRevision := r.cache.GetMaxRevision()
-       r.l.Debug().Int64("sinceRevision", 
sinceRevision).Msg("collectIncrementalSync: querying updates")
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, 
sc.update, sinceRevision)
-               if queryErr != nil {
-                       return queryErr
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+       backoff := watchInitialBackoff
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
                }
-               r.l.Debug().Str("node", nodeName).Int("updatedKinds", 
len(updatedKindNames)).
-                       Strs("kinds", 
updatedKindNames).Msg("collectIncrementalSync: got updates")
-               for _, kindName := range updatedKindNames {
-                       kind, kindErr := KindFromString(kindName)
-                       if kindErr != nil {
-                               r.l.Warn().Str("kindName", 
kindName).Msg("unknown kind from aggregate update")
-                               continue
-                       }
-                       query := buildSchemaQuery(kind, "", "", sinceRevision)
-                       schemas, schemasErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if schemasErr != nil {
-                               return fmt.Errorf("failed to query updated 
schemas for kind %s: %w", kind, schemasErr)
+               err := r.processWatchSession(ctx, updateClient, session)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
                        }
-                       r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                               Int("schemas", 
len(schemas)).Msg("collectIncrementalSync: collected")
-                       collector.add(nodeName, kind, schemas)
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
                }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect 
incremental sync from some nodes")
        }
 }
 
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
-       for kind, propMap := range collector.entries {
-               cachedEntries := r.cache.GetEntriesByKind(kind)
-               seen := make(map[string]bool, len(propMap))
-               for propID, s := range propMap {
-                       seen[propID] = true
-                       if s.deleteTime > 0 {
-                               if entry, exists := cachedEntries[propID]; 
exists {
-                                       r.handleDeletion(kind, propID, entry, 
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+       stream, err := updateClient.WatchSchemas(ctx)
+       if err != nil {
+               return err
+       }
+
+       maxRev := r.cache.GetMaxRevision()
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession: 
sending initial replay request")
+       initReq := &schemav1.WatchSchemasRequest{
+               Criteria: buildRevisionCriteria(maxRev),
+       }
+       if sendErr := stream.Send(initReq); sendErr != nil {
+               return sendErr
+       }
+
+       recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+       recvErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       resp, recvErr := stream.Recv()
+                       if recvErr != nil {
+                               recvErrCh <- recvErr
+                               close(recvCh)
+                               return
+                       }
+                       recvCh <- resp
+               }
+       }()
+
+       var inSync bool
+       var metadataOnly bool
+       var digests []*digestEntry
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case err := <-recvErrCh:
+                       return err
+               case req := <-session.syncReqCh:
+                       r.l.Debug().Bool("metadataOnly", len(req.tagProjection) 
> 0).
+                               Bool("hasCriteria", req.criteria != 
nil).Msg("processWatchSession: sending sync request")
+                       if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+                               Criteria:      req.criteria,
+                               TagProjection: req.tagProjection,
+                       }); sendErr != nil {
+                               return sendErr
+                       }
+                       inSync = true
+                       metadataOnly = len(req.tagProjection) > 0
+                       digests = nil
+               case resp, ok := <-recvCh:
+                       if !ok {
+                               return <-recvErrCh
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               r.l.Debug().Bool("inSync", 
inSync).Msg("processWatchSession: received REPLAY_DONE")
+                               if inSync {
+                                       r.l.Debug().Int("digestCount", 
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+                                       session.syncResCh <- digests
+                                       digests = nil
+                                       inSync = false
+                                       metadataOnly = false
                                }
                                continue
                        }
-                       md, convErr := ToSchema(kind, s.property)
-                       if convErr != nil {
-                               r.l.Warn().Err(convErr).Str("propID", 
propID).Msg("failed to convert property for reconcile")
-                               continue
+                       if inSync && metadataOnly {
+                               digests = append(digests, r.parseDigest(resp))
+                       } else {
+                               r.handleWatchEvent(resp)
                        }
-                       r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
                }
        }
 }
 
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
-       r.mux.RLock()
-       handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
-       for kind := range r.handlers {
-               handlerKindSet[kind] = struct{}{}
-       }
-       r.mux.RUnlock()
-       for _, kind := range r.cache.GetCachedKinds() {
-               handlerKindSet[kind] = struct{}{}
-       }
-       kinds := make([]schema.Kind, 0, len(handlerKindSet))
-       for kind := range handlerKindSet {
-               kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse) 
*digestEntry {
+       prop := resp.GetProperty()
+       parsed := ParseTags(prop.GetTags())
+       return &digestEntry{
+               propID:     prop.GetId(),
+               kind:       parsed.Kind,
+               group:      parsed.Group,
+               name:       parsed.Name,
+               revision:   parsed.UpdatedAt,
+               deleteTime: resp.GetDeleteTime(),
        }
-       return kinds
 }
 
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client 
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
-       query := buildUpdatedSchemasQuery(sinceRevision)
-       resp, rpcErr := client.AggregateSchemaUpdates(ctx, 
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
-       if rpcErr != nil {
-               return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) 
{
+       prop := resp.GetProperty()
+       if prop == nil {
+               return
+       }
+       parsed := ParseTags(prop.GetTags())
+       kindStr := parsed.Kind
+       if kindStr == "" {
+               kindStr = prop.GetMetadata().GetName()
+       }
+       kind, kindErr := KindFromString(kindStr)
+       if kindErr != nil {
+               r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in 
event")
+               return
+       }
+       r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind", 
kind).
+               Str("propID", prop.GetId()).Msg("watch: received event")
+       switch resp.GetEventType() {
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+               md, convErr := ToSchema(kind, prop)
+               if convErr != nil {
+                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert property")
+                       return
+               }
+               r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+               propID := prop.GetId()
+               if r.cache.Delete(propID, resp.GetDeleteTime()) {
+                       md, convErr := ToSchema(kind, prop)
+                       if convErr != nil {
+                               r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert deleted property")
+                               return
+                       }
+                       r.notifyHandlers(kind, md, true)
+               }
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+               // handled by processWatchSession, not expected here
        }
-       return resp.GetNames(), nil
 }
 
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
-       return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               return
+       }
+       maxRev := r.cache.GetMaxRevision()
+       req := &syncRequest{
+               criteria: buildRevisionCriteria(maxRev),
+       }
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync: 
requesting changes")
+       r.sendSyncRequest(ctx, sessions, req)
 }
 
-type kindGroupPair struct {
-       groupName string
-       kind      schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       sessions := make(map[string]*watchSession, len(r.watchSessions))
+       for name, s := range r.watchSessions {
+               sessions[name] = s
+       }
+       return sessions
 }
 
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc 
*schemaClient) error {
-       groups, listErr := r.listGroupFromClient(ctx, sc.management)
-       if listErr != nil {
-               return listErr
-       }
-       var pairs []kindGroupPair
-       for _, group := range groups {
-               r.processInitialResource(schema.KindGroup, group)
-               catalog := group.GetCatalog()
-               groupName := group.GetMetadata().GetName()
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindStream},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindMeasure},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTopNAggregation},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_TRACE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTrace},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_PROPERTY:
-                       pairs = append(pairs, kindGroupPair{groupName: 
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+       sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+       r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest: 
dispatching to watch sessions")
+       sent := make(map[string]*watchSession, len(sessions))
+       for name, s := range sessions {
+               select {
+               case s.syncReqCh <- req:
+                       sent[name] = s
                default:
-                       r.l.Error().Stringer("catalog", catalog).Str("group", 
groupName).Msg("unknown catalog type during initialization")
+                       r.l.Warn().Str("node", name).Msg("sendSyncRequest: 
channel full, skipping session")
                }
        }
-       var wg sync.WaitGroup
-       for _, pair := range pairs {
-               currentPair := pair
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       r.initResourcesFromClient(ctx, sc.management, 
currentPair.kind, currentPair.groupName)
-               }()
+       allDigests := make(map[string][]*digestEntry, len(sent))
+       for name, s := range sent {
+               select {
+               case digests := <-s.syncResCh:
+                       allDigests[name] = digests
+               case <-time.After(30 * time.Second):
+                       r.l.Warn().Str("node", name).Msg("sync timeout")
+               case <-ctx.Done():
+                       return nil
+               }
        }
-       wg.Wait()
-       return nil
+       return allDigests
 }
 
-func (r *SchemaRegistry) listGroupFromClient(ctx context.Context, client 
schemav1.SchemaManagementServiceClient) ([]*commonv1.Group, error) {
-       query := buildSchemaQuery(schema.KindGroup, "", "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               return nil, queryErr
+func (r *SchemaRegistry) performFullSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               r.l.Debug().Msg("performFullSync: no active watch sessions, 
skipping")
+               return
        }
-       var groups []*commonv1.Group
-       for _, s := range results {
-               if s.deleteTime > 0 {
-                       continue
-               }
-               md, convErr := ToSchema(schema.KindGroup, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Msg("failed to convert group 
property")
-                       continue
-               }
-               g, ok := md.Spec.(*commonv1.Group)
-               if !ok {
-                       continue
-               }
-               groups = append(groups, g)
+       req := &syncRequest{
+               tagProjection: basicTagKeys,
        }
-       return groups, nil
-}
-
-func (r *SchemaRegistry) initResourcesFromClient(ctx context.Context,
-       client schemav1.SchemaManagementServiceClient, kind schema.Kind, 
groupName string,
-) {
-       query := buildSchemaQuery(kind, groupName, "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               r.l.Warn().Err(queryErr).Stringer("kind", kind).Str("group", 
groupName).Msg("failed to list resources for init")
+       allDigests := r.sendSyncRequest(ctx, sessions, req)
+       if allDigests == nil {
                return
        }
-       for _, s := range results {
-               if s.deleteTime > 0 {
+
+       merged := r.mergeDigests(allDigests)
+       cachedEntries := r.cache.GetAllEntries()
+       r.l.Debug().Int("mergedCount", len(merged)).Int("cachedCount", 
len(cachedEntries)).
+               Msg("performFullSync: comparing server digests with local 
cache")
+
+       for propID, d := range merged {
+               if d.deleteTime > 0 {
+                       if entry, exists := cachedEntries[propID]; exists {
+                               kind, kindErr := KindFromString(d.kind)
+                               if kindErr != nil {
+                                       r.l.Warn().Str("kind", 
d.kind).Msg("full sync: unknown kind")
+                                       continue
+                               }
+                               r.handleDeletion(kind, propID, entry, 
d.revision)
+                       }
                        continue
                }
-               md, convErr := ToSchema(kind, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("failed to convert property for init")
-                       continue
+               localEntry := cachedEntries[propID]
+               if localEntry == nil || localEntry.latestUpdateAt < d.revision {
+                       kind, kindErr := KindFromString(d.kind)
+                       if kindErr != nil {
+                               r.l.Warn().Str("kind", d.kind).Msg("full sync: 
unknown kind")
+                               continue
+                       }
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
kind).
+                               Int64("serverRevision", 
d.revision).Msg("performFullSync: fetching updated schema from server")
+                       prop, err := r.getSchema(ctx, kind, d.group, d.name)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to get schema")
+                               continue
+                       }
+                       if prop == nil {
+                               continue
+                       }
+                       md, err := ToSchema(kind, prop)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to convert property")
+                               continue
+                       }
+                       r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+               }
+       }
+
+       for propID, entry := range cachedEntries {
+               if _, exists := merged[propID]; !exists {
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
entry.kind).
+                               Msg("performFullSync: cached entry not found on 
server, deleting")
+                       r.handleDeletion(entry.kind, propID, entry, 
entry.latestUpdateAt)
                }
-               r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
        }
 }
 
-func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec 
proto.Message) {
-       prop, convErr := SchemaToProperty(kind, spec)
-       if convErr != nil {
-               r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to 
convert spec to property for initial resource")
-               return
+func (r *SchemaRegistry) mergeDigests(allDigests map[string][]*digestEntry) 
map[string]*digestEntry {
+       merged := make(map[string]*digestEntry)
+       for _, digests := range allDigests {
+               for _, d := range digests {
+                       existing, exists := merged[d.propID]
+                       if !exists || d.revision > existing.revision ||
+                               (d.revision == existing.revision && 
d.deleteTime > existing.deleteTime) {
+                               merged[d.propID] = d
+                       }
+               }
        }
-       r.processInitialResourceFromProperty(kind, prop, spec)
+       return merged
 }
 
 func (r *SchemaRegistry) processInitialResourceFromProperty(kind schema.Kind, 
prop *propertyv1.Property, spec proto.Message) {
@@ -1345,6 +1365,8 @@ func (r *SchemaRegistry) 
processInitialResourceFromProperty(kind schema.Kind, pr
 }
 
 func (r *SchemaRegistry) handleDeletion(kind schema.Kind, propID string, entry 
*cacheEntry, revision int64) {
+       r.l.Debug().Stringer("kind", kind).Str("propID", 
propID).Int64("revision", revision).
+               Msg("handleDeletion: attempting to delete from cache")
        if r.cache.Delete(propID, revision) {
                r.notifyHandlers(kind, schema.Metadata{
                        TypeMeta: schema.TypeMeta{
@@ -1372,6 +1394,21 @@ func (r *SchemaRegistry) notifyHandlers(kind 
schema.Kind, md schema.Metadata, is
        }
 }
 
+func (r *SchemaRegistry) notifyHandlersFromCache() {
+       entries := r.cache.GetAllEntries()
+       r.l.Debug().Int("entryCount", 
len(entries)).Msg("notifyHandlersFromCache: replaying cached entries to 
handlers")
+       for _, entry := range entries {
+               r.notifyHandlers(entry.kind, schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind:  entry.kind,
+                               Name:  entry.name,
+                               Group: entry.group,
+                       },
+                       Spec: entry.spec,
+               }, false)
+       }
+}
+
 // ActiveNodeNames returns the names of all active schema server nodes.
 func (r *SchemaRegistry) ActiveNodeNames() []string {
        return r.connMgr.ActiveNames()
@@ -1382,6 +1419,12 @@ func (r *SchemaRegistry) Close() error {
        if r.caCertReloader != nil {
                r.caCertReloader.Stop()
        }
+       r.watchMu.Lock()
+       for nodeName, session := range r.watchSessions {
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       r.watchMu.Unlock()
        r.closer.Done()
        r.closer.CloseThenWait()
        r.connMgr.GracefulStop()
diff --git a/banyand/metadata/schema/property/client_test.go 
b/banyand/metadata/schema/property/client_test.go
index ce3f9616a..4986e9192 100644
--- a/banyand/metadata/schema/property/client_test.go
+++ b/banyand/metadata/schema/property/client_test.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "net"
        "strconv"
-       "strings"
        "sync"
        "testing"
        "time"
@@ -37,6 +36,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -46,7 +46,10 @@ import (
        testflags "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-const testStreamName = "test-stream"
+const (
+       testStreamName       = "test-stream"
+       testDeleteStreamName = "delete-stream"
+)
 
 func startTestSchemaServer(t *testing.T) string {
        t.Helper()
@@ -55,6 +58,12 @@ func startTestSchemaServer(t *testing.T) string {
 }
 
 func startTestSchemaServerStoppable(t *testing.T) (string, func()) {
+       t.Helper()
+       _, addr, stopFn := startTestSchemaServerFull(t)
+       return addr, stopFn
+}
+
+func startTestSchemaServerFull(t *testing.T) (schemaserver.Server, string, 
func()) {
        t.Helper()
        srv := schemaserver.NewServer(observability.BypassRegistry)
        flagSet := srv.FlagSet()
@@ -67,7 +76,9 @@ func startTestSchemaServerStoppable(t *testing.T) (string, 
func()) {
        require.NoError(t, srv.Validate())
        require.NoError(t, srv.PreRun(context.Background()))
        srv.Serve()
-       t.Cleanup(func() { srv.GracefulStop() })
+       var once sync.Once
+       stopFn := func() { once.Do(func() { srv.GracefulStop() }) }
+       t.Cleanup(stopFn)
        addr := net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(port), 
10))
        deadline := time.Now().Add(5 * time.Second)
        for {
@@ -81,7 +92,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string, 
func()) {
                }
                time.Sleep(50 * time.Millisecond)
        }
-       return addr, func() { srv.GracefulStop() }
+       return srv, addr, stopFn
 }
 
 func getFreePort(t *testing.T) uint32 {
@@ -899,21 +910,14 @@ func TestOnAddOrUpdate_NewNodeWithData(t *testing.T) {
        reg.RegisterHandler("test-handler", schema.KindGroup|schema.KindStream, 
handler)
        // Now add server2 which has pre-existing data.
        addNodeToRegistry(reg, "node-2", addr2)
-       // Start triggers initializeFromSchemaClient for all active nodes.
+       // Start replays cached entries to handlers. Watch replay is async, so
+       // we need to wait for the handler to receive notifications.
        reg.Start(context.Background())
        // Handler should receive OnAddOrUpdate events for the group and stream 
from server2.
-       events := handler.getAddOrUpdateEvents()
-       var hasGroup, hasStream bool
-       for _, evt := range events {
-               if evt.Kind == schema.KindGroup && evt.Name == "test-group" {
-                       hasGroup = true
-               }
-               if evt.Kind == schema.KindStream && evt.Name == testStreamName {
-                       hasStream = true
-               }
-       }
-       assert.True(t, hasGroup, "handler should receive group OnAddOrUpdate, 
events: %+v", events)
-       assert.True(t, hasStream, "handler should receive stream OnAddOrUpdate, 
events: %+v", events)
+       groupEvt := waitForAddOrUpdateEvent(t, handler, schema.KindGroup, 
"test-group")
+       assert.Equal(t, schema.KindGroup, groupEvt.Kind)
+       streamEvt := waitForAddOrUpdateEvent(t, handler, schema.KindStream, 
testStreamName)
+       assert.Equal(t, "test-group", streamEvt.Group)
        // Registry should be able to read the stream.
        ctx := context.Background()
        got, getErr := reg.GetStream(ctx, &commonv1.Metadata{Group: 
"test-group", Name: testStreamName})
@@ -1064,29 +1068,37 @@ func TestOnAddOrUpdate_HandlerNotification(t 
*testing.T) {
        reg.RegisterHandler("measure-handler", schema.KindMeasure, 
measureHandler)
        // Add server2 with pre-existing data.
        addNodeToRegistry(reg, "node-2", addr2)
-       // Start triggers initializeFromSchemaClient for all active nodes.
+       // Start replays cached entries to handlers. Watch replay is async.
        reg.Start(context.Background())
-       time.Sleep(500 * time.Millisecond)
-       // Stream handler should get stream events but not measure events.
-       streamEvents := streamHandler.getAddOrUpdateEvents()
-       var streamHasStream bool
-       for _, evt := range streamEvents {
-               if evt.Kind == schema.KindStream && evt.Name == testStreamName {
-                       streamHasStream = true
+       // Wait for async watch replay to complete and handlers to receive 
events.
+       require.Eventually(t, func() bool {
+               streamEvents := streamHandler.getAddOrUpdateEvents()
+               var streamHasStream bool
+               for _, evt := range streamEvents {
+                       if evt.Kind == schema.KindStream && evt.Name == 
testStreamName {
+                               streamHasStream = true
+                       }
                }
-               assert.NotEqual(t, schema.KindMeasure, evt.Kind, "stream 
handler should not receive measure events")
-       }
-       assert.True(t, streamHasStream, "stream handler should receive stream 
OnAddOrUpdate")
-       // Measure handler should get measure events but not stream events.
-       measureEvents := measureHandler.getAddOrUpdateEvents()
-       var measureHasMeasure bool
-       for _, evt := range measureEvents {
-               if evt.Kind == schema.KindMeasure && evt.Name == "test-measure" 
{
-                       measureHasMeasure = true
+               measureEvents := measureHandler.getAddOrUpdateEvents()
+               var measureHasMeasure bool
+               for _, evt := range measureEvents {
+                       if evt.Kind == schema.KindMeasure && evt.Name == 
"test-measure" {
+                               measureHasMeasure = true
+                       }
                }
-               assert.NotEqual(t, schema.KindStream, evt.Kind, "measure 
handler should not receive stream events")
-       }
-       assert.True(t, measureHasMeasure, "measure handler should receive 
measure OnAddOrUpdate")
+               return streamHasStream && measureHasMeasure
+       }, 5*time.Second, 100*time.Millisecond, "handlers should receive their 
respective events")
+       // Verify handler isolation with concrete data checks.
+       streamEvents := streamHandler.getAddOrUpdateEvents()
+       assert.Len(t, streamEvents, 1, "stream handler should receive exactly 1 
event")
+       assert.Equal(t, schema.KindStream, streamEvents[0].Kind)
+       assert.Equal(t, testStreamName, streamEvents[0].Name)
+       assert.Equal(t, "test-group", streamEvents[0].Group)
+       measureEvents := measureHandler.getAddOrUpdateEvents()
+       assert.Len(t, measureEvents, 1, "measure handler should receive exactly 
1 event")
+       assert.Equal(t, schema.KindMeasure, measureEvents[0].Kind)
+       assert.Equal(t, "test-measure", measureEvents[0].Name)
+       assert.Equal(t, "measure-group", measureEvents[0].Group)
 }
 
 func startTestSchemaServerTLS(t *testing.T) string {
@@ -1183,9 +1195,8 @@ type storedSchema struct {
 type recordingSchemaServer struct {
        schemav1.UnimplementedSchemaManagementServiceServer
        schemav1.UnimplementedSchemaUpdateServiceServer
-       schemas        map[string]*storedSchema
-       sinceRevisions []int64
-       mu             sync.Mutex
+       schemas map[string]*storedSchema
+       mu      sync.Mutex
 }
 
 func newRecordingSchemaServer() *recordingSchemaServer {
@@ -1200,27 +1211,18 @@ func (s *recordingSchemaServer) addSchema(prop 
*propertyv1.Property) {
        s.schemas[prop.GetId()] = &storedSchema{prop: prop}
 }
 
-func (s *recordingSchemaServer) markDeleted(propID string) {
+func (s *recordingSchemaServer) updateSchema(prop *propertyv1.Property) {
        s.mu.Lock()
        defer s.mu.Unlock()
-       if stored, ok := s.schemas[propID]; ok {
-               stored.deleteTime = time.Now().UnixNano()
-       }
-}
-
-func (s *recordingSchemaServer) aggregateCallCount() int {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-       return len(s.sinceRevisions)
+       s.schemas[prop.GetId()] = &storedSchema{prop: prop}
 }
 
-func (s *recordingSchemaServer) lastSinceRevision() int64 {
+func (s *recordingSchemaServer) markDeleted(propID string) {
        s.mu.Lock()
        defer s.mu.Unlock()
-       if len(s.sinceRevisions) == 0 {
-               return 0
+       if stored, ok := s.schemas[propID]; ok {
+               stored.deleteTime = time.Now().UnixNano()
        }
-       return s.sinceRevisions[len(s.sinceRevisions)-1]
 }
 
 func (s *recordingSchemaServer) ListSchemas(req *schemav1.ListSchemasRequest, 
stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse]) error {
@@ -1258,26 +1260,62 @@ func (s *recordingSchemaServer) ListSchemas(req 
*schemav1.ListSchemasRequest, st
        return nil
 }
 
-func (s *recordingSchemaServer) AggregateSchemaUpdates(_ context.Context, req 
*schemav1.AggregateSchemaUpdatesRequest) 
(*schemav1.AggregateSchemaUpdatesResponse, error) {
-       sinceRevision := extractSinceRevision(req.GetQuery())
-       s.mu.Lock()
-       s.sinceRevisions = append(s.sinceRevisions, sinceRevision)
-       kindSet := make(map[string]struct{})
-       for _, stored := range s.schemas {
-               if stored.deleteTime > 0 {
-                       continue
+func (s *recordingSchemaServer) WatchSchemas(stream 
grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse]) error {
+       for {
+               req, err := stream.Recv()
+               if err != nil {
+                       return err
+               }
+               metadataOnly := len(req.TagProjection) > 0
+               maxRevision := extractMaxRevisionFromCriteria(req.Criteria)
+               s.mu.Lock()
+               for _, stored := range s.schemas {
+                       parsed := property.ParseTags(stored.prop.GetTags())
+                       if maxRevision > 0 && parsed.UpdatedAt <= maxRevision {
+                               continue
+                       }
+                       eventType := 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT
+                       if stored.deleteTime > 0 {
+                               eventType = 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE
+                       }
+                       if sendErr := 
stream.Send(&schemav1.WatchSchemasResponse{
+                               EventType:    eventType,
+                               Property:     stored.prop,
+                               MetadataOnly: metadataOnly,
+                               DeleteTime:   stored.deleteTime,
+                       }); sendErr != nil {
+                               s.mu.Unlock()
+                               return sendErr
+                       }
                }
-               parsed := property.ParseTags(stored.prop.GetTags())
-               if parsed.UpdatedAt > sinceRevision {
-                       kindSet[stored.prop.GetMetadata().GetName()] = 
struct{}{}
+               s.mu.Unlock()
+               if sendErr := stream.Send(&schemav1.WatchSchemasResponse{
+                       EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+               }); sendErr != nil {
+                       return sendErr
                }
        }
-       s.mu.Unlock()
-       names := make([]string, 0, len(kindSet))
-       for kindName := range kindSet {
-               names = append(names, kindName)
+}
+
+// extractMaxRevisionFromCriteria extracts the max_revision value from a 
Criteria condition.
+// It looks for a condition on "updated_at" with BINARY_OP_GT.
+func extractMaxRevisionFromCriteria(c *modelv1.Criteria) int64 {
+       if c == nil {
+               return 0
+       }
+       if cond := c.GetCondition(); cond != nil {
+               if cond.GetName() == "updated_at" && cond.GetOp() == 
modelv1.Condition_BINARY_OP_GT {
+                       return cond.GetValue().GetInt().GetValue()
+               }
+               return 0
+       }
+       if le := c.GetLe(); le != nil {
+               if v := extractMaxRevisionFromCriteria(le.GetLeft()); v > 0 {
+                       return v
+               }
+               return extractMaxRevisionFromCriteria(le.GetRight())
        }
-       return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+       return 0
 }
 
 func (s *recordingSchemaServer) InsertSchema(_ context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
@@ -1299,14 +1337,6 @@ func (s *recordingSchemaServer) DeleteSchema(_ 
context.Context, req *schemav1.De
        return &schemav1.DeleteSchemaResponse{Found: false}, nil
 }
 
-func extractSinceRevision(query *propertyv1.QueryRequest) int64 {
-       cond := query.GetCriteria().GetCondition()
-       if cond != nil && cond.GetName() == "updated_at" {
-               return cond.GetValue().GetInt().GetValue()
-       }
-       return 0
-}
-
 func startRecordingServer(t *testing.T) (*recordingSchemaServer, string) {
        t.Helper()
        mock := newRecordingSchemaServer()
@@ -1372,30 +1402,35 @@ func buildMockStreamProperty(t *testing.T, name string, 
updatedAt time.Time) *pr
 }
 
 func TestSyncLoop(t *testing.T) {
-       t.Run("incremental_sync_calls_aggregate", func(t *testing.T) {
+       t.Run("full_reconcile_detects_deletion", func(t *testing.T) {
                mock, addr := startRecordingServer(t)
                mock.addSchema(buildMockGroupProperty(t))
                mock.addSchema(buildMockStreamProperty(t, testStreamName, 
time.Now()))
+               streamPropID := property.BuildPropertyID(schema.KindStream, 
&commonv1.Metadata{Group: "test-group", Name: testStreamName})
                reg := newTestRegistryWithConfig(t, &property.ClientConfig{
-                       SyncInterval:       50 * time.Millisecond,
-                       FullReconcileEvery: 100,
+                       SyncInterval: 50 * time.Millisecond,
                }, addr)
                handler := &testEventHandler{}
                reg.RegisterHandler("sync-handler", 
schema.KindGroup|schema.KindStream, handler)
                ctx, cancel := context.WithCancel(context.Background())
                t.Cleanup(cancel)
                require.NoError(t, reg.Start(ctx))
-               require.Eventually(t, func() bool {
-                       return mock.aggregateCallCount() >= 3
-               }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected 
at least 3 AggregateSchemaUpdates calls")
+               initEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, testStreamName)
+               assert.Equal(t, "test-group", initEvt.Group)
+               // Mark stream as deleted on the server.
+               mock.markDeleted(streamPropID)
+               // Full reconcile should detect the deletion and notify the 
handler.
+               deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream, 
testStreamName)
+               assert.Equal(t, "test-group", deleteEvt.Group)
        })
 
-       t.Run("full_sync_skips_aggregate", func(t *testing.T) {
+       t.Run("full_reconcile_detects_update", func(t *testing.T) {
                mock, addr := startRecordingServer(t)
                mock.addSchema(buildMockGroupProperty(t))
-               mock.addSchema(buildMockStreamProperty(t, testStreamName, 
time.Now()))
+               initialTime := time.Now()
+               mock.addSchema(buildMockStreamProperty(t, testStreamName, 
initialTime))
                reg := newTestRegistryWithConfig(t, &property.ClientConfig{
-                       SyncInterval:       50 * time.Millisecond,
+                       SyncInterval:       500 * time.Millisecond,
                        FullReconcileEvery: 1,
                }, addr)
                handler := &testEventHandler{}
@@ -1403,26 +1438,30 @@ func TestSyncLoop(t *testing.T) {
                ctx, cancel := context.WithCancel(context.Background())
                t.Cleanup(cancel)
                require.NoError(t, reg.Start(ctx))
-               // Wait for init to deliver the stream.
+               waitForAddOrUpdateEvent(t, handler, schema.KindStream, 
testStreamName)
+               // Update the stream on the server with a newer revision.
+               mock.updateSchema(buildMockStreamProperty(t, testStreamName, 
initialTime.Add(10*time.Second)))
+               // Full reconcile should detect the higher revision and 
re-fetch + update cache.
                require.Eventually(t, func() bool {
+                       count := 0
                        for _, evt := range handler.getAddOrUpdateEvents() {
                                if evt.Kind == schema.KindStream && evt.Name == 
testStreamName {
-                                       return true
+                                       count++
                                }
                        }
-                       return false
-               }, testflags.EventuallyTimeout, 20*time.Millisecond)
-               // Let several sync rounds fire (all full).
-               time.Sleep(300 * time.Millisecond)
-               assert.Equal(t, 0, mock.aggregateCallCount(), "full sync should 
not call AggregateSchemaUpdates")
+                       return count >= 2
+               }, testflags.EventuallyTimeout, 20*time.Millisecond, "full 
reconcile should detect updated stream")
+               latestEvt := findLastEvent(handler.getAddOrUpdateEvents(), 
schema.KindStream, testStreamName)
+               assert.Equal(t, "test-group", latestEvt.Group)
        })
 
-       t.Run("sinceRevision_advances_with_new_data", func(t *testing.T) {
-               mock, addr := startRecordingServer(t)
-               mock.addSchema(buildMockGroupProperty(t))
-               mock.addSchema(buildMockStreamProperty(t, testStreamName, 
time.Now()))
+       t.Run("incremental_sync_picks_up_new_data", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"initial-stream", time.Now()))
                reg := newTestRegistryWithConfig(t, &property.ClientConfig{
-                       SyncInterval:       50 * time.Millisecond,
+                       SyncInterval:       500 * time.Millisecond,
                        FullReconcileEvery: 100,
                }, addr)
                handler := &testEventHandler{}
@@ -1430,43 +1469,22 @@ func TestSyncLoop(t *testing.T) {
                ctx, cancel := context.WithCancel(context.Background())
                t.Cleanup(cancel)
                require.NoError(t, reg.Start(ctx))
-               // Wait for at least 2 aggregate calls to establish a baseline 
revision.
-               require.Eventually(t, func() bool {
-                       return mock.aggregateCallCount() >= 2
-               }, testflags.EventuallyTimeout, 20*time.Millisecond)
-               rev0 := mock.lastSinceRevision()
-               assert.Greater(t, rev0, int64(0), "sinceRevision should be > 0 
after init")
-               // With no new data, sinceRevision should remain stable.
-               countBefore := mock.aggregateCallCount()
-               require.Eventually(t, func() bool {
-                       return mock.aggregateCallCount() >= countBefore+2
-               }, testflags.EventuallyTimeout, 20*time.Millisecond)
-               assert.Equal(t, rev0, mock.lastSinceRevision(), "sinceRevision 
should not change without new data")
-               // Add a new stream with a far-future updatedAt.
-               futureTime := time.Now().Add(time.Hour)
-               mock.addSchema(buildMockStreamProperty(t, "new-stream", 
futureTime))
-               // sinceRevision should advance after the new data is picked up.
-               require.Eventually(t, func() bool {
-                       return mock.lastSinceRevision() > rev0
-               }, testflags.EventuallyTimeout, 20*time.Millisecond, 
"sinceRevision should advance after new data")
-               // Handler should have received the new stream.
-               require.Eventually(t, func() bool {
-                       for _, evt := range handler.getAddOrUpdateEvents() {
-                               if evt.Kind == schema.KindStream && evt.Name == 
"new-stream" {
-                                       return true
-                               }
-                       }
-                       return false
-               }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler 
should receive OnAddOrUpdate for new-stream")
+               waitForServerWatcher(t, srv)
+               waitForAddOrUpdateEvent(t, handler, schema.KindStream, 
"initial-stream")
+               // Insert new stream directly on server.
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"incremental-stream", time.Now()))
+               found := waitForAddOrUpdateEvent(t, handler, schema.KindStream, 
"incremental-stream")
+               assert.Equal(t, "test-group", found.Group)
        })
 
-       t.Run("full_reconcile_detects_deletion", func(t *testing.T) {
+       t.Run("full_reconcile_every_1_runs_each_tick", func(t *testing.T) {
                mock, addr := startRecordingServer(t)
                mock.addSchema(buildMockGroupProperty(t))
-               mock.addSchema(buildMockStreamProperty(t, testStreamName, 
time.Now()))
-               streamPropID := property.BuildPropertyID(schema.KindStream, 
&commonv1.Metadata{Group: "test-group", Name: testStreamName})
+               mock.addSchema(buildMockStreamProperty(t, "s1", time.Now()))
+               mock.addSchema(buildMockStreamProperty(t, "s2", time.Now()))
+               s2PropID := property.BuildPropertyID(schema.KindStream, 
&commonv1.Metadata{Group: "test-group", Name: "s2"})
                reg := newTestRegistryWithConfig(t, &property.ClientConfig{
-                       SyncInterval:       50 * time.Millisecond,
+                       SyncInterval:       500 * time.Millisecond,
                        FullReconcileEvery: 1,
                }, addr)
                handler := &testEventHandler{}
@@ -1474,25 +1492,220 @@ func TestSyncLoop(t *testing.T) {
                ctx, cancel := context.WithCancel(context.Background())
                t.Cleanup(cancel)
                require.NoError(t, reg.Start(ctx))
-               // Wait for init to deliver the stream.
-               require.Eventually(t, func() bool {
-                       for _, evt := range handler.getAddOrUpdateEvents() {
-                               if evt.Kind == schema.KindStream && evt.Name == 
testStreamName {
-                                       return true
-                               }
+               waitForAddOrUpdateEvent(t, handler, schema.KindStream, "s1")
+               waitForAddOrUpdateEvent(t, handler, schema.KindStream, "s2")
+               // Mark s2 as deleted. Full reconcile (every tick) should 
detect it quickly.
+               mock.markDeleted(s2PropID)
+               deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream, 
"s2")
+               assert.Equal(t, "test-group", deleteEvt.Group)
+       })
+}
+
+func waitForAddOrUpdateEvent(t *testing.T, handler *testEventHandler, kind 
schema.Kind, name string) schema.Metadata {
+       t.Helper()
+       require.Eventually(t, func() bool {
+               for _, evt := range handler.getAddOrUpdateEvents() {
+                       if evt.Kind == kind && evt.Name == name {
+                               return true
                        }
-                       return false
-               }, testflags.EventuallyTimeout, 20*time.Millisecond)
-               // Mark stream as deleted on the server.
-               mock.markDeleted(streamPropID)
-               // Full reconcile should detect the deletion and notify the 
handler.
-               require.Eventually(t, func() bool {
-                       for _, evt := range handler.getDeleteEvents() {
-                               if evt.Kind == schema.KindStream && 
strings.Contains(evt.Name, testStreamName) {
-                                       return true
-                               }
+               }
+               return false
+       }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected 
AddOrUpdate event for %s/%s", kind, name)
+       return findLastEvent(handler.getAddOrUpdateEvents(), kind, name)
+}
+
+func waitForDeleteEvent(t *testing.T, handler *testEventHandler, kind 
schema.Kind, name string) schema.Metadata {
+       t.Helper()
+       require.Eventually(t, func() bool {
+               for _, evt := range handler.getDeleteEvents() {
+                       if evt.Kind == kind && evt.Name == name {
+                               return true
                        }
-                       return false
-               }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler 
should receive OnDelete for test-stream")
+               }
+               return false
+       }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected Delete 
event for %s/%s", kind, name)
+       return findLastEvent(handler.getDeleteEvents(), kind, name)
+}
+
+func findLastEvent(events []schema.Metadata, kind schema.Kind, name string) 
schema.Metadata {
+       for idx := len(events) - 1; idx >= 0; idx-- {
+               if events[idx].Kind == kind && events[idx].Name == name {
+                       return events[idx]
+               }
+       }
+       return schema.Metadata{}
+}
+
+func waitForServerWatcher(t *testing.T, srv schemaserver.Server) {
+       t.Helper()
+       type watcherCounter interface{ WatcherCount() int }
+       wc, ok := srv.(watcherCounter)
+       require.True(t, ok, "server does not implement WatcherCount")
+       require.Eventually(t, func() bool {
+               return wc.WatcherCount() > 0
+       }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected at least 
one watcher to connect")
+}
+
+func insertSchemaProperty(t *testing.T, mgmt 
schemav1.SchemaManagementServiceClient, prop *propertyv1.Property) {
+       t.Helper()
+       _, err := mgmt.InsertSchema(context.Background(), 
&schemav1.InsertSchemaRequest{Property: prop})
+       require.NoError(t, err)
+}
+
+func updateSchemaProperty(t *testing.T, mgmt 
schemav1.SchemaManagementServiceClient, prop *propertyv1.Property) {
+       t.Helper()
+       _, err := mgmt.UpdateSchema(context.Background(), 
&schemav1.UpdateSchemaRequest{Property: prop})
+       require.NoError(t, err)
+}
+
+func deleteSchemaProperty(t *testing.T, mgmt 
schemav1.SchemaManagementServiceClient, name, id string) {
+       t.Helper()
+       _, err := mgmt.DeleteSchema(context.Background(), 
&schemav1.DeleteSchemaRequest{
+               Delete:   &propertyv1.DeleteRequest{Group: schema.SchemaGroup, 
Name: name, Id: id},
+               UpdateAt: timestamppb.Now(),
+       })
+       require.NoError(t, err)
+}
+
+func TestWatchPush(t *testing.T) {
+       t.Run("watch_push_insert", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv)
+               // Insert a new stream via the real server RPC — triggers watch 
broadcast.
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"watch-stream", time.Now()))
+               insertEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, "watch-stream")
+               assert.Equal(t, "test-group", insertEvt.Group)
+       })
+
+       t.Run("watch_push_update", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"update-stream", time.Now().Add(-time.Hour)))
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv)
+               initEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, "update-stream")
+               assert.Equal(t, "test-group", initEvt.Group)
+               initialCount := len(handler.getAddOrUpdateEvents())
+               // Update with a newer timestamp via the real server RPC.
+               updateSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"update-stream", time.Now()))
+               require.Eventually(t, func() bool {
+                       return len(handler.getAddOrUpdateEvents()) > 
initialCount
+               }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler 
should receive watch push update")
+               lastEvt := findLastEvent(handler.getAddOrUpdateEvents(), 
schema.KindStream, "update-stream")
+               assert.Equal(t, "test-group", lastEvt.Group)
+       })
+
+       t.Run("watch_push_delete", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               streamProp := buildMockStreamProperty(t, testDeleteStreamName, 
time.Now())
+               insertSchemaProperty(t, mgmt, streamProp)
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv)
+               initEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, testDeleteStreamName)
+               assert.Equal(t, "test-group", initEvt.Group)
+               // Delete via the real server RPC — triggers watch broadcast.
+               deleteSchemaProperty(t, mgmt, 
streamProp.GetMetadata().GetName(), streamProp.GetId())
+               deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream, 
testDeleteStreamName)
+               assert.Equal(t, "test-group", deleteEvt.Group)
+       })
+
+       t.Run("watch_older_revision_ignored", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               now := time.Now()
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"rev-stream", now))
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv)
+               initEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, "rev-stream")
+               assert.Equal(t, "test-group", initEvt.Group)
+               initialCount := len(handler.getAddOrUpdateEvents())
+               // Update with an older version — should be ignored by cache 
revision check.
+               updateSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"rev-stream", now.Add(-time.Hour)))
+               time.Sleep(200 * time.Millisecond)
+               assert.Equal(t, initialCount, 
len(handler.getAddOrUpdateEvents()), "older revision should not trigger 
handler")
+       })
+
+       t.Run("watch_new_node_added", func(t *testing.T) {
+               srv1, addr1, _ := startTestSchemaServerFull(t)
+               mgmt1 := rawMgmtClient(t, addr1)
+               insertSchemaProperty(t, mgmt1, buildMockGroupProperty(t))
+               srv2, addr2, _ := startTestSchemaServerFull(t)
+               mgmt2 := rawMgmtClient(t, addr2)
+               insertSchemaProperty(t, mgmt2, buildMockGroupProperty(t))
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr1)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv1)
+               // Dynamically add the second server.
+               addNodeToRegistry(reg, "new-node", addr2)
+               waitForServerWatcher(t, srv2)
+               // Insert via the new server — client should receive it through 
watch.
+               insertSchemaProperty(t, mgmt2, buildMockStreamProperty(t, 
"new-node-stream", time.Now()))
+               newNodeEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, "new-node-stream")
+               assert.Equal(t, "test-group", newNodeEvt.Group)
+       })
+
+       t.Run("watch_server_restart", func(t *testing.T) {
+               srv, addr, _ := startTestSchemaServerFull(t)
+               mgmt := rawMgmtClient(t, addr)
+               insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+               reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+                       SyncInterval: 10 * time.Minute,
+               }, addr)
+               handler := &testEventHandler{}
+               reg.RegisterHandler("watch-handler", 
schema.KindGroup|schema.KindStream, handler)
+               ctx, cancel := context.WithCancel(context.Background())
+               t.Cleanup(cancel)
+               require.NoError(t, reg.Start(ctx))
+               waitForServerWatcher(t, srv)
+               // Verify initial watcher count.
+               type watcherCounter interface{ WatcherCount() int }
+               wc, ok := srv.(watcherCounter)
+               require.True(t, ok, "server does not implement WatcherCount")
+               assert.Equal(t, 1, wc.WatcherCount())
+               // Insert via the real server RPC — should be received through 
watch.
+               insertSchemaProperty(t, mgmt, buildMockStreamProperty(t, 
"before-restart", time.Now()))
+               restartEvt := waitForAddOrUpdateEvent(t, handler, 
schema.KindStream, "before-restart")
+               assert.Equal(t, "test-group", restartEvt.Group)
        })
 }
diff --git a/banyand/metadata/schema/property/converter.go 
b/banyand/metadata/schema/property/converter.go
index af6051c24..af6987731 100644
--- a/banyand/metadata/schema/property/converter.go
+++ b/banyand/metadata/schema/property/converter.go
@@ -166,6 +166,28 @@ func ParseTags(tags []*modelv1.Tag) ParsedTags {
        return pt
 }
 
+// basicTagKeys are the tag keys used for metadata-only projections (full sync 
digest).
+var basicTagKeys = []string{TagKeyKind, TagKeyGroup, TagKeyName, 
TagKeyUpdatedAt}
+
+// buildRevisionCriteria builds a Criteria for updated_at > sinceRevision.
+// Returns nil if sinceRevision <= 0.
+func buildRevisionCriteria(sinceRevision int64) *modelv1.Criteria {
+       if sinceRevision <= 0 {
+               return nil
+       }
+       return &modelv1.Criteria{
+               Exp: &modelv1.Criteria_Condition{
+                       Condition: &modelv1.Condition{
+                               Name: TagKeyUpdatedAt,
+                               Op:   modelv1.Condition_BINARY_OP_GT,
+                               Value: &modelv1.TagValue{
+                                       Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: sinceRevision}},
+                               },
+                       },
+               },
+       }
+}
+
 func buildSchemaQuery(kind schema.Kind, group, name string, sinceRevision 
int64) *propertyv1.QueryRequest {
        query := &propertyv1.QueryRequest{
                Groups: []string{schema.SchemaGroup},
@@ -219,24 +241,39 @@ func buildSchemaQuery(kind schema.Kind, group, name 
string, sinceRevision int64)
        return query
 }
 
-func buildUpdatedSchemasQuery(sinceRevision int64) *propertyv1.QueryRequest {
-       query := &propertyv1.QueryRequest{
-               Groups: []string{schema.SchemaGroup},
+// ParsePropID parses a property ID into kind, group, and name.
+// Format: "<kind>_<group>/<name>" or "<kind>_<name>" (for KindGroup).
+func ParsePropID(propID string) (kind schema.Kind, group, name string, err 
error) {
+       underscoreIdx := -1
+       for idx := range propID {
+               if propID[idx] == '_' {
+                       underscoreIdx = idx
+                       break
+               }
        }
-       if sinceRevision > 0 {
-               query.Criteria = &modelv1.Criteria{
-                       Exp: &modelv1.Criteria_Condition{
-                               Condition: &modelv1.Condition{
-                                       Name: TagKeyUpdatedAt,
-                                       Op:   modelv1.Condition_BINARY_OP_GT,
-                                       Value: &modelv1.TagValue{
-                                               Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: sinceRevision}},
-                                       },
-                               },
-                       },
+       if underscoreIdx < 0 {
+               return 0, "", "", fmt.Errorf("invalid propID format: %s", 
propID)
+       }
+       kindStr := propID[:underscoreIdx]
+       rest := propID[underscoreIdx+1:]
+       kind, err = KindFromString(kindStr)
+       if err != nil {
+               return 0, "", "", err
+       }
+       if kind == schema.KindGroup {
+               return kind, "", rest, nil
+       }
+       slashIdx := -1
+       for idx := range rest {
+               if rest[idx] == '/' {
+                       slashIdx = idx
+                       break
                }
        }
-       return query
+       if slashIdx < 0 {
+               return 0, "", "", fmt.Errorf("invalid propID format (missing 
group/name separator): %s", propID)
+       }
+       return kind, rest[:slashIdx], rest[slashIdx+1:], nil
 }
 
 // buildDeleteRequest builds a DeleteSchema request.
diff --git a/banyand/metadata/schema/property/converter_test.go 
b/banyand/metadata/schema/property/converter_test.go
index e9393e34c..5c14d9832 100644
--- a/banyand/metadata/schema/property/converter_test.go
+++ b/banyand/metadata/schema/property/converter_test.go
@@ -417,17 +417,30 @@ func TestBuildSchemaQuery_WithNameIgnoresSinceRevision(t 
*testing.T) {
        assert.Nil(t, query.GetCriteria(), "name-based lookup should not have 
criteria even with sinceRevision")
 }
 
-func TestBuildUpdatedSchemasQuery_Zero(t *testing.T) {
-       query := buildUpdatedSchemasQuery(0)
-       assert.Nil(t, query.GetCriteria())
-       assert.Empty(t, query.GetName())
-       assert.Equal(t, []string{schema.SchemaGroup}, query.GetGroups())
-}
-
-func TestBuildUpdatedSchemasQuery_Positive(t *testing.T) {
-       query := buildUpdatedSchemasQuery(100)
-       assert.NotNil(t, query.GetCriteria())
-       assert.Equal(t, []string{schema.SchemaGroup}, query.GetGroups())
+func TestParsePropID(t *testing.T) {
+       kind, group, name, err := ParsePropID("stream_g1/s1")
+       require.NoError(t, err)
+       assert.Equal(t, schema.KindStream, kind)
+       assert.Equal(t, "g1", group)
+       assert.Equal(t, "s1", name)
+
+       kind, group, name, err = ParsePropID("group_my-group")
+       require.NoError(t, err)
+       assert.Equal(t, schema.KindGroup, kind)
+       assert.Empty(t, group)
+       assert.Equal(t, "my-group", name)
+
+       invalidKind, invalidGroup, invalidName, err := ParsePropID("invalid")
+       assert.Error(t, err)
+       assert.Equal(t, schema.Kind(0), invalidKind)
+       assert.Empty(t, invalidGroup)
+       assert.Empty(t, invalidName)
+
+       unknownKind, unknownGroup, unknownName, err := 
ParsePropID("unknown_g1/s1")
+       assert.Error(t, err)
+       assert.Equal(t, schema.Kind(0), unknownKind)
+       assert.Empty(t, unknownGroup)
+       assert.Empty(t, unknownName)
 }
 
 func TestBuildDeleteRequest(t *testing.T) {
diff --git a/banyand/metadata/schema/property/schema_client.go 
b/banyand/metadata/schema/property/schema_client.go
index 84d878988..0197418a3 100644
--- a/banyand/metadata/schema/property/schema_client.go
+++ b/banyand/metadata/schema/property/schema_client.go
@@ -45,6 +45,7 @@ var _ grpchelper.Client = (*schemaClient)(nil)
 
 type connectionHandler struct {
        l              *logger.Logger
+       registry       *SchemaRegistry
        caCertReloader *pkgtls.Reloader
        tlsEnabled     bool
 }
@@ -81,11 +82,17 @@ func (h *connectionHandler) NewClient(conn 
*grpc.ClientConn, _ *databasev1.Node)
 }
 
 // OnActive is called when a node transitions to active.
-func (h *connectionHandler) OnActive(name string, _ *schemaClient) {
+func (h *connectionHandler) OnActive(name string, client *schemaClient) {
        h.l.Info().Str("node", name).Msg("schema server node is active")
+       if h.registry != nil {
+               h.registry.launchWatch(name, client)
+       }
 }
 
 // OnInactive is called when a node leaves active.
 func (h *connectionHandler) OnInactive(name string, _ *schemaClient) {
        h.l.Info().Str("node", name).Msg("schema server node is inactive")
+       if h.registry != nil {
+               h.registry.stopWatch(name)
+       }
 }
diff --git a/banyand/metadata/schema/schemaserver/grpc.go 
b/banyand/metadata/schema/schemaserver/grpc.go
index 5929cbe92..e1b3a6272 100644
--- a/banyand/metadata/schema/schemaserver/grpc.go
+++ b/banyand/metadata/schema/schemaserver/grpc.go
@@ -19,6 +19,8 @@ package schemaserver
 
 import (
        "context"
+       "errors"
+       "io"
        "time"
 
        "google.golang.org/grpc"
@@ -26,6 +28,7 @@ import (
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/encoding/protojson"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -55,9 +58,10 @@ func newServerMetrics(factory observability.Factory) 
*serverMetrics {
 
 type schemaManagementServer struct {
        schemav1.UnimplementedSchemaManagementServiceServer
-       server  *server
-       l       *logger.Logger
-       metrics *serverMetrics
+       server   *server
+       watchers *watcherManager
+       l        *logger.Logger
+       metrics  *serverMetrics
 }
 
 // InsertSchema inserts a new schema property.
@@ -101,6 +105,10 @@ func (s *schemaManagementServer) InsertSchema(ctx 
context.Context, req *schemav1
                s.l.Error().Err(updateErr).Msg("failed to insert schema")
                return nil, updateErr
        }
+       s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+               EventType: schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
+               Property:  req.Property,
+       })
        return &schemav1.InsertSchemaResponse{}, nil
 }
 
@@ -128,6 +136,10 @@ func (s *schemaManagementServer) UpdateSchema(ctx 
context.Context, req *schemav1
                s.l.Error().Err(updateErr).Msg("failed to update schema")
                return nil, updateErr
        }
+       s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+               EventType: schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE,
+               Property:  req.Property,
+       })
        return &schemav1.UpdateSchemaResponse{}, nil
 }
 
@@ -206,11 +218,19 @@ func (s *schemaManagementServer) DeleteSchema(ctx 
context.Context, req *schemav1
                return &schemav1.DeleteSchemaResponse{Found: false}, nil
        }
        ids := make([][]byte, 0, len(results))
+       var deletedProps []*propertyv1.Property
        for _, result := range results {
                if result.DeleteTime() > 0 {
                        continue
                }
                ids = append(ids, result.ID())
+               var p propertyv1.Property
+               if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
+                       s.metrics.totalErr.Inc(1, "delete")
+                       s.l.Error().Err(unmarshalErr).Msg("failed to unmarshal 
property for delete broadcast")
+                       return nil, unmarshalErr
+               }
+               deletedProps = append(deletedProps, &p)
        }
        if len(ids) == 0 {
                return &schemav1.DeleteSchemaResponse{Found: false}, nil
@@ -220,6 +240,14 @@ func (s *schemaManagementServer) DeleteSchema(ctx 
context.Context, req *schemav1
                s.l.Error().Err(deleteErr).Msg("failed to delete schema")
                return nil, deleteErr
        }
+       deleteTimeNano := req.UpdateAt.AsTime().UnixNano()
+       for _, dp := range deletedProps {
+               s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+                       EventType:  
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE,
+                       Property:   dp,
+                       DeleteTime: deleteTimeNano,
+               })
+       }
        return &schemav1.DeleteSchemaResponse{Found: true}, nil
 }
 
@@ -252,47 +280,119 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
        schemav1.UnimplementedSchemaUpdateServiceServer
-       server  *server
-       l       *logger.Logger
-       metrics *serverMetrics
+       server   *server
+       watchers *watcherManager
+       l        *logger.Logger
+       metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-       req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-       s.metrics.totalStarted.Inc(1, "aggregate")
-       start := time.Now()
-       defer func() {
-               s.metrics.totalFinished.Inc(1, "aggregate")
-               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+       id, ch := s.watchers.Subscribe()
+       defer s.watchers.Unsubscribe(id)
+
+       reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+       reqErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       req, err := stream.Recv()
+                       if err != nil {
+                               if errors.Is(err, io.EOF) {
+                                       close(reqCh)
+                                       return
+                               }
+                               reqErrCh <- err
+                               close(reqCh)
+                               return
+                       }
+                       reqCh <- req
+               }
        }()
-       if req.Query == nil {
-               return nil, errInvalidRequest("query is required")
+
+       for {
+               select {
+               case <-stream.Context().Done():
+                       return stream.Context().Err()
+               case err := <-reqErrCh:
+                       return err
+               case req, ok := <-reqCh:
+                       if !ok {
+                               return nil
+                       }
+                       if replayErr := s.replaySchemas(stream, req); replayErr 
!= nil {
+                               return replayErr
+                       }
+                       if doneErr := 
stream.Send(&schemav1.WatchSchemasResponse{
+                               EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+                       }); doneErr != nil {
+                               return doneErr
+                       }
+               case resp, ok := <-ch:
+                       if !ok {
+                               return nil
+                       }
+                       if sendErr := stream.Send(resp); sendErr != nil {
+                               return sendErr
+                       }
+               }
        }
-       req.Query.Groups = []string{schema.SchemaGroup}
-       results, queryErr := s.server.db.Query(ctx, req.Query)
-       if queryErr != nil {
-               s.metrics.totalErr.Inc(1, "aggregate")
-               s.l.Error().Err(queryErr).Msg("failed to aggregate schema 
updates")
-               return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+       req *schemav1.WatchSchemasRequest,
+) error {
+       metadataOnly := len(req.TagProjection) > 0
+       query := &propertyv1.QueryRequest{
+               Groups:   []string{schema.SchemaGroup},
+               Criteria: req.Criteria,
+       }
+       results, err := s.server.db.Query(stream.Context(), query)
+       if err != nil {
+               return err
        }
-       nameSet := make(map[string]struct{}, len(results))
        for _, result := range results {
                var p propertyv1.Property
                if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
-                       s.metrics.totalErr.Inc(1, "aggregate")
-                       return nil, unmarshalErr
+                       s.metrics.totalErr.Inc(1, "replay")
+                       return unmarshalErr
+               }
+               if metadataOnly {
+                       p.Tags = filterTags(p.Tags, req.TagProjection)
                }
-               if p.Metadata != nil && p.Metadata.Name != "" {
-                       nameSet[p.Metadata.Name] = struct{}{}
+               eventType := schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT
+               dt := result.DeleteTime()
+               if dt > 0 {
+                       eventType = 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE
+               }
+               if sendErr := stream.Send(&schemav1.WatchSchemasResponse{
+                       EventType:    eventType,
+                       Property:     &p,
+                       MetadataOnly: metadataOnly,
+                       DeleteTime:   dt,
+               }); sendErr != nil {
+                       return sendErr
                }
        }
-       names := make([]string, 0, len(nameSet))
-       for name := range nameSet {
-               names = append(names, name)
+       return nil
+}
+
+func filterTags(tags []*modelv1.Tag, projection []string) []*modelv1.Tag {
+       allowed := make(map[string]struct{}, len(projection))
+       for _, key := range projection {
+               allowed[key] = struct{}{}
+       }
+       filtered := make([]*modelv1.Tag, 0, len(projection))
+       for _, tag := range tags {
+               if _, ok := allowed[tag.GetKey()]; ok {
+                       filtered = append(filtered, tag)
+               }
        }
-       return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+       return filtered
 }
 
 func errInvalidRequest(msg string) error {
diff --git a/banyand/metadata/schema/schemaserver/grpc_test.go 
b/banyand/metadata/schema/schemaserver/grpc_test.go
index 3de1812e6..dd6a7eb32 100644
--- a/banyand/metadata/schema/schemaserver/grpc_test.go
+++ b/banyand/metadata/schema/schemaserver/grpc_test.go
@@ -397,39 +397,152 @@ func TestRepairSchema(t *testing.T) {
        })
 }
 
-func TestAggregateSchemaUpdates(t *testing.T) {
-       t.Run("nil query", func(t *testing.T) {
-               _, upd := startTestServer(t)
-               _, rpcErr := upd.AggregateSchemaUpdates(context.Background(), 
&schemav1.AggregateSchemaUpdatesRequest{})
-               require.Error(t, rpcErr)
-               assert.Equal(t, codes.InvalidArgument, grpcstatus.Code(rpcErr))
-               assert.Contains(t, rpcErr.Error(), "query is required")
-       })
-       t.Run("empty results", func(t *testing.T) {
-               _, upd := startTestServer(t)
-               resp, rpcErr := 
upd.AggregateSchemaUpdates(context.Background(), 
&schemav1.AggregateSchemaUpdatesRequest{
-                       Query: &propertyv1.QueryRequest{Groups: 
[]string{schema.SchemaGroup}, Name: "nonexistent"},
-               })
-               require.NoError(t, rpcErr)
-               assert.Empty(t, resp.Names)
-       })
-       t.Run("single result", func(t *testing.T) {
+func TestWatchSchemas_BiDirReplay(t *testing.T) {
+       t.Run("replay_from_revision", func(t *testing.T) {
                mgmt, upd := startTestServer(t)
+               // Insert a property.
                insertProperty(t, mgmt, "svc-a", "id1")
-               resp, rpcErr := 
upd.AggregateSchemaUpdates(context.Background(), 
&schemav1.AggregateSchemaUpdatesRequest{
-                       Query: &propertyv1.QueryRequest{Groups: 
[]string{schema.SchemaGroup}, Name: "svc-a"},
+               // Wait a bit then insert another.
+               time.Sleep(10 * time.Millisecond)
+               // Get the first property's updatedAt to use as max_revision.
+               stream, streamErr := mgmt.ListSchemas(context.Background(), 
&schemav1.ListSchemasRequest{
+                       Query: &propertyv1.QueryRequest{Groups: 
[]string{schema.SchemaGroup}, Name: "svc-a", Ids: []string{"id1"}},
                })
-               require.NoError(t, rpcErr)
-               assert.Equal(t, []string{"svc-a"}, resp.Names)
+               require.NoError(t, streamErr)
+               responses, recvErr := collectListResponses(stream)
+               require.NoError(t, recvErr)
+               require.Len(t, responses, 1)
+               // Insert a second property after the first.
+               insertProperty(t, mgmt, "svc-b", "id2")
+               // Start bidi watch with max_revision = 0 (replay all).
+               ctx, cancel := context.WithCancel(context.Background())
+               defer cancel()
+               watchStream, watchErr := upd.WatchSchemas(ctx)
+               require.NoError(t, watchErr)
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+               // Collect events until REPLAY_DONE.
+               var events []*schemav1.WatchSchemasResponse
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+                       events = append(events, resp)
+               }
+               assert.GreaterOrEqual(t, len(events), 2, "should replay all 
properties")
        })
-       t.Run("duplicates deduped", func(t *testing.T) {
+
+       t.Run("metadata_only_replay", func(t *testing.T) {
                mgmt, upd := startTestServer(t)
-               insertProperty(t, mgmt, "svc-a", "id1")
-               insertProperty(t, mgmt, "svc-a", "id2")
-               resp, rpcErr := 
upd.AggregateSchemaUpdates(context.Background(), 
&schemav1.AggregateSchemaUpdatesRequest{
-                       Query: &propertyv1.QueryRequest{Groups: 
[]string{schema.SchemaGroup}, Name: "svc-a"},
+               insertProperty(t, mgmt, "svc-meta", "id-meta")
+               ctx, cancel := context.WithCancel(context.Background())
+               defer cancel()
+               watchStream, watchErr := upd.WatchSchemas(ctx)
+               require.NoError(t, watchErr)
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{
+                       TagProjection: []string{"kind", "group", "name", 
"updated_at"},
+               }))
+               var events []*schemav1.WatchSchemasResponse
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+                       events = append(events, resp)
+               }
+               require.NotEmpty(t, events)
+               for _, evt := range events {
+                       assert.True(t, evt.MetadataOnly, "replay events should 
have metadata_only=true")
+                       // Verify that only projected tags are returned, not 
the full property tags.
+                       allowedKeys := map[string]bool{"kind": true, "group": 
true, "name": true, "updated_at": true}
+                       for _, tag := range evt.Property.GetTags() {
+                               assert.True(t, allowedKeys[tag.GetKey()],
+                                       "unexpected tag key %q in metadata_only 
replay; only projected tags should be returned", tag.GetKey())
+                       }
+               }
+       })
+
+       t.Run("live_events_after_replay", func(t *testing.T) {
+               mgmt, upd := startTestServer(t)
+               insertProperty(t, mgmt, "svc-live", "id-live")
+               ctx, cancel := context.WithCancel(context.Background())
+               defer cancel()
+               watchStream, watchErr := upd.WatchSchemas(ctx)
+               require.NoError(t, watchErr)
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+               // Drain replay.
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+               }
+               // Insert new data - should come through as live event.
+               insertProperty(t, mgmt, "svc-new", "id-new")
+               resp, err := watchStream.Recv()
+               require.NoError(t, err)
+               assert.Equal(t, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, resp.EventType)
+               assert.Equal(t, "id-new", resp.Property.GetId())
+       })
+
+       t.Run("multiple_requests", func(t *testing.T) {
+               mgmt, upd := startTestServer(t)
+               insertProperty(t, mgmt, "svc-multi", "id-multi")
+               ctx, cancel := context.WithCancel(context.Background())
+               defer cancel()
+               watchStream, watchErr := upd.WatchSchemas(ctx)
+               require.NoError(t, watchErr)
+               // First request.
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+               }
+               // Second request on same stream (metadata only).
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{
+                       TagProjection: []string{"kind", "group", "name", 
"updated_at"},
+               }))
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+                       assert.True(t, resp.MetadataOnly)
+               }
+       })
+
+       t.Run("replay_includes_deleted", func(t *testing.T) {
+               mgmt, upd := startTestServer(t)
+               insertProperty(t, mgmt, "svc-del", "id-del")
+               _, deleteErr := mgmt.DeleteSchema(context.Background(), 
&schemav1.DeleteSchemaRequest{
+                       Delete:   &propertyv1.DeleteRequest{Group: 
schema.SchemaGroup, Name: "svc-del", Id: "id-del"},
+                       UpdateAt: timestamppb.Now(),
                })
-               require.NoError(t, rpcErr)
-               assert.Equal(t, []string{"svc-a"}, resp.Names)
+               require.NoError(t, deleteErr)
+               ctx, cancel := context.WithCancel(context.Background())
+               defer cancel()
+               watchStream, watchErr := upd.WatchSchemas(ctx)
+               require.NoError(t, watchErr)
+               require.NoError(t, 
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+               var hasDeleteEvent bool
+               for {
+                       resp, err := watchStream.Recv()
+                       require.NoError(t, err)
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               break
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE && resp.Property.GetId() == 
"id-del" {
+                               hasDeleteEvent = true
+                               assert.Greater(t, resp.DeleteTime, int64(0), 
"delete event should carry a non-zero delete_time")
+                       }
+               }
+               assert.True(t, hasDeleteEvent, "replay should include deleted 
entries as DELETE events")
        })
 }
diff --git a/banyand/metadata/schema/schemaserver/service.go 
b/banyand/metadata/schema/schemaserver/service.go
index 4f613faf2..8944d93db 100644
--- a/banyand/metadata/schema/schemaserver/service.go
+++ b/banyand/metadata/schema/schemaserver/service.go
@@ -80,6 +80,7 @@ type server struct {
        l                        *logger.Logger
        ser                      *grpclib.Server
        tlsReloader              *pkgtls.Reloader
+       watchers                 *watcherManager
        schemaService            *schemaManagementServer
        updateService            *schemaUpdateServer
        host                     string
@@ -114,6 +115,14 @@ func (s *server) GetPort() *uint32 {
        return &s.port
 }
 
+// WatcherCount returns the number of active schema watchers.
+func (s *server) WatcherCount() int {
+       if s.watchers == nil {
+               return 0
+       }
+       return s.watchers.Count()
+}
+
 // RegisterGossip registers the DB's repair gRPC services with the gossip 
messenger.
 func (s *server) RegisterGossip(messenger gossip.Messenger) {
        s.db.RegisterGossip(messenger)
@@ -176,17 +185,20 @@ func (s *server) PreRun(_ context.Context) error {
        s.l = logger.GetLogger("schema-server")
        s.lfs = fs.NewLocalFileSystem()
 
+       s.watchers = newWatcherManager(s.l)
        grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
        sm := newServerMetrics(grpcFactory)
        s.schemaService = &schemaManagementServer{
-               server:  s,
-               l:       s.l,
-               metrics: sm,
+               server:   s,
+               watchers: s.watchers,
+               l:        s.l,
+               metrics:  sm,
        }
        s.updateService = &schemaUpdateServer{
-               server:  s,
-               l:       s.l,
-               metrics: sm,
+               server:   s,
+               watchers: s.watchers,
+               l:        s.l,
+               metrics:  sm,
        }
 
        if s.tls {
@@ -307,6 +319,9 @@ func (s *server) Serve() run.StopNotify {
 }
 
 func (s *server) GracefulStop() {
+       if s.watchers != nil {
+               s.watchers.Close()
+       }
        if s.tlsReloader != nil {
                s.tlsReloader.Stop()
        }
diff --git a/banyand/metadata/schema/schemaserver/watcher.go 
b/banyand/metadata/schema/schemaserver/watcher.go
new file mode 100644
index 000000000..ab726c866
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/watcher.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "sync"
+
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const watcherChannelSize = 512
+
+type watcherManager struct {
+       l        *logger.Logger
+       watchers map[uint64]chan *schemav1.WatchSchemasResponse
+       mu       sync.RWMutex
+       nextID   uint64
+}
+
+func newWatcherManager(l *logger.Logger) *watcherManager {
+       return &watcherManager{
+               l:        l,
+               watchers: make(map[uint64]chan *schemav1.WatchSchemasResponse),
+       }
+}
+
+// Subscribe registers a new watcher and returns its ID and event channel.
+func (wm *watcherManager) Subscribe() (uint64, <-chan 
*schemav1.WatchSchemasResponse) {
+       wm.mu.Lock()
+       defer wm.mu.Unlock()
+       wm.nextID++
+       id := wm.nextID
+       ch := make(chan *schemav1.WatchSchemasResponse, watcherChannelSize)
+       wm.watchers[id] = ch
+       wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers", 
len(wm.watchers)).Msg("watcher subscribed")
+       return id, ch
+}
+
+// Unsubscribe removes a watcher and closes its channel.
+func (wm *watcherManager) Unsubscribe(id uint64) {
+       wm.mu.Lock()
+       defer wm.mu.Unlock()
+       if ch, ok := wm.watchers[id]; ok {
+               delete(wm.watchers, id)
+               close(ch)
+               wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers", 
len(wm.watchers)).Msg("watcher unsubscribed")
+       }
+}
+
+// Broadcast sends an event to all watchers without blocking.
+func (wm *watcherManager) Broadcast(resp *schemav1.WatchSchemasResponse) {
+       wm.mu.RLock()
+       defer wm.mu.RUnlock()
+       var propertyID string
+       if prop := resp.GetProperty(); prop != nil {
+               propertyID = prop.GetId()
+       }
+       wm.l.Debug().Stringer("eventType", resp.GetEventType()).
+               Str("propertyID", propertyID).
+               Int("watcherCount", len(wm.watchers)).
+               Msg("broadcasting schema event")
+       for id, ch := range wm.watchers {
+               select {
+               case ch <- resp:
+               default:
+                       wm.l.Warn().Uint64("watcherID", id).
+                               Stringer("eventType", resp.GetEventType()).
+                               Str("propertyID", propertyID).
+                               Msg("watcher channel full, dropping event")
+               }
+       }
+}
+
+// Count returns the number of active watchers.
+func (wm *watcherManager) Count() int {
+       wm.mu.RLock()
+       defer wm.mu.RUnlock()
+       return len(wm.watchers)
+}
+
+// Close removes all watchers and closes their channels.
+func (wm *watcherManager) Close() {
+       wm.mu.Lock()
+       defer wm.mu.Unlock()
+       for id, ch := range wm.watchers {
+               delete(wm.watchers, id)
+               close(ch)
+       }
+}
diff --git a/banyand/metadata/schema/schemaserver/watcher_test.go 
b/banyand/metadata/schema/schemaserver/watcher_test.go
new file mode 100644
index 000000000..57991d09c
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/watcher_test.go
@@ -0,0 +1,115 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestWatcherManager_SubscribeUnsubscribe(t *testing.T) {
+       wm := newWatcherManager(logger.GetLogger("test-watcher"))
+       defer wm.Close()
+       id1, ch1 := wm.Subscribe()
+       id2, ch2 := wm.Subscribe()
+       assert.NotEqual(t, id1, id2)
+       assert.NotNil(t, ch1)
+       assert.NotNil(t, ch2)
+       wm.Unsubscribe(id1)
+       // ch1 should be closed
+       _, ok := <-ch1
+       assert.False(t, ok)
+       // ch2 should still be open
+       wm.Broadcast(&schemav1.WatchSchemasResponse{EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT})
+       select {
+       case resp := <-ch2:
+               assert.Equal(t, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, resp.GetEventType())
+       case <-time.After(time.Second):
+               t.Fatal("expected event on ch2")
+       }
+}
+
+func TestWatcherManager_Broadcast(t *testing.T) {
+       wm := newWatcherManager(logger.GetLogger("test-watcher"))
+       defer wm.Close()
+       const numWatchers = 5
+       channels := make([]<-chan *schemav1.WatchSchemasResponse, numWatchers)
+       for i := range numWatchers {
+               _, channels[i] = wm.Subscribe()
+       }
+       resp := &schemav1.WatchSchemasResponse{EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE}
+       wm.Broadcast(resp)
+       for i, ch := range channels {
+               select {
+               case got := <-ch:
+                       assert.Equal(t, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE, got.GetEventType(), "watcher 
%d", i)
+               case <-time.After(time.Second):
+                       t.Fatalf("watcher %d did not receive event", i)
+               }
+       }
+}
+
+func TestWatcherManager_BroadcastNonBlocking(t *testing.T) {
+       wm := newWatcherManager(logger.GetLogger("test-watcher"))
+       defer wm.Close()
+       _, ch := wm.Subscribe()
+       // Fill the channel
+       for range watcherChannelSize {
+               wm.Broadcast(&schemav1.WatchSchemasResponse{EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT})
+       }
+       // This broadcast should not block even though the channel is full
+       done := make(chan struct{})
+       go func() {
+               wm.Broadcast(&schemav1.WatchSchemasResponse{EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE})
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               t.Fatal("Broadcast blocked on full channel")
+       }
+       // Drain and verify we got watcherChannelSize events
+       count := 0
+       for range watcherChannelSize {
+               select {
+               case <-ch:
+                       count++
+               default:
+               }
+       }
+       assert.Equal(t, watcherChannelSize, count)
+}
+
+func TestWatcherManager_Close(t *testing.T) {
+       wm := newWatcherManager(logger.GetLogger("test-watcher"))
+       _, ch1 := wm.Subscribe()
+       _, ch2 := wm.Subscribe()
+       wm.Close()
+       _, ok1 := <-ch1
+       require.False(t, ok1, "ch1 should be closed")
+       _, ok2 := <-ch2
+       require.False(t, ok2, "ch2 should be closed")
+       // Broadcast after close should not panic
+       wm.Broadcast(&schemav1.WatchSchemasResponse{})
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 650a060bc..e208b001c 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -360,8 +360,6 @@
     - [RepairService](#banyandb-property-v1-RepairService)
   
 - [banyandb/schema/v1/internal.proto](#banyandb_schema_v1_internal-proto)
-    - 
[AggregateSchemaUpdatesRequest](#banyandb-schema-v1-AggregateSchemaUpdatesRequest)
-    - 
[AggregateSchemaUpdatesResponse](#banyandb-schema-v1-AggregateSchemaUpdatesResponse)
     - [DeleteSchemaRequest](#banyandb-schema-v1-DeleteSchemaRequest)
     - [DeleteSchemaResponse](#banyandb-schema-v1-DeleteSchemaResponse)
     - [InsertSchemaRequest](#banyandb-schema-v1-InsertSchemaRequest)
@@ -372,6 +370,10 @@
     - [RepairSchemaResponse](#banyandb-schema-v1-RepairSchemaResponse)
     - [UpdateSchemaRequest](#banyandb-schema-v1-UpdateSchemaRequest)
     - [UpdateSchemaResponse](#banyandb-schema-v1-UpdateSchemaResponse)
+    - [WatchSchemasRequest](#banyandb-schema-v1-WatchSchemasRequest)
+    - [WatchSchemasResponse](#banyandb-schema-v1-WatchSchemasResponse)
+  
+    - [SchemaEventType](#banyandb-schema-v1-SchemaEventType)
   
     - [SchemaManagementService](#banyandb-schema-v1-SchemaManagementService)
     - [SchemaUpdateService](#banyandb-schema-v1-SchemaUpdateService)
@@ -5440,36 +5442,6 @@ WriteResponse is the response contract for write
 
 
 
-<a name="banyandb-schema-v1-AggregateSchemaUpdatesRequest"></a>
-
-### AggregateSchemaUpdatesRequest
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| query | 
[banyandb.property.v1.QueryRequest](#banyandb-property-v1-QueryRequest) |  |  |
-
-
-
-
-
-
-<a name="banyandb-schema-v1-AggregateSchemaUpdatesResponse"></a>
-
-### AggregateSchemaUpdatesResponse
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| names | [string](#string) | repeated | which names/schemas has updates |
-
-
-
-
-
-
 <a name="banyandb-schema-v1-DeleteSchemaRequest"></a>
 
 ### DeleteSchemaRequest
@@ -5607,8 +5579,57 @@ WriteResponse is the response contract for write
 
 
 
+
+<a name="banyandb-schema-v1-WatchSchemasRequest"></a>
+
+### WatchSchemasRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) |  |  |
+| tag_projection | [string](#string) | repeated |  |
+
+
+
+
+
+
+<a name="banyandb-schema-v1-WatchSchemasResponse"></a>
+
+### WatchSchemasResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| event_type | [SchemaEventType](#banyandb-schema-v1-SchemaEventType) |  |  |
+| property | [banyandb.property.v1.Property](#banyandb-property-v1-Property) | 
 |  |
+| metadata_only | [bool](#bool) |  |  |
+| delete_time | [int64](#int64) |  | delete_time is the deletion timestamp in 
nanoseconds. 0 means not deleted, &gt;0 means the property was deleted at this 
time. |
+
+
+
+
+
  
 
+
+<a name="banyandb-schema-v1-SchemaEventType"></a>
+
+### SchemaEventType
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| SCHEMA_EVENT_TYPE_UNSPECIFIED | 0 |  |
+| SCHEMA_EVENT_TYPE_INSERT | 1 |  |
+| SCHEMA_EVENT_TYPE_UPDATE | 2 |  |
+| SCHEMA_EVENT_TYPE_DELETE | 3 |  |
+| SCHEMA_EVENT_TYPE_REPLAY_DONE | 4 |  |
+
+
  
 
  
@@ -5635,7 +5656,7 @@ WriteResponse is the response contract for write
 
 | Method Name | Request Type | Response Type | Description |
 | ----------- | ------------ | ------------- | ------------|
-| AggregateSchemaUpdates | 
[AggregateSchemaUpdatesRequest](#banyandb-schema-v1-AggregateSchemaUpdatesRequest)
 | 
[AggregateSchemaUpdatesResponse](#banyandb-schema-v1-AggregateSchemaUpdatesResponse)
 |  |
+| WatchSchemas | 
[WatchSchemasRequest](#banyandb-schema-v1-WatchSchemasRequest) stream | 
[WatchSchemasResponse](#banyandb-schema-v1-WatchSchemasResponse) stream |  |
 
  
 
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index b053b9363..f56c38c43 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -460,7 +460,6 @@ func standaloneServerWithAuth(config *ClusterConfig, path 
string, ports []int, s
                ff = append(ff,
                        "--schema-server-grpc-host=127.0.0.1",
                        fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
-                       "--schema-property-client-sync-interval=300ms",
                )
                config.addSchemaServerAddr(schemaAddr)
        } else {
@@ -654,7 +653,6 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                flags = append(flags,
                        "--schema-server-grpc-host="+nodeHost,
                        fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
-                       "--schema-property-client-sync-interval=300ms",
                )
                config.addSchemaServerAddr(schemaAddr)
        } else {
@@ -769,9 +767,7 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags 
...string) (string, string
                flags = append(flags,
                        fmt.Sprintf("--node-discovery-file-path=%s", 
config.NodeDiscovery.FileWriter.Path()))
        }
-       if isPropertyMode {
-               flags = append(flags, "--schema-property-client-sync-interval", 
"1s")
-       } else {
+       if !isPropertyMode {
                flags = append(flags, "--etcd-endpoints", config.EtcdEndpoint)
        }
        closeFn := CMD(flags...)
diff --git a/test/e2e-v2/cases/cluster/docker-compose-property.yml 
b/test/e2e-v2/cases/cluster/docker-compose-property.yml
index 6d390b226..ede0538c6 100644
--- a/test/e2e-v2/cases/cluster/docker-compose-property.yml
+++ b/test/e2e-v2/cases/cluster/docker-compose-property.yml
@@ -27,7 +27,7 @@ services:
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: data
-    command: data --etcd-endpoints=http://etcd:2379 
--schema-registry-mode=property --schema-property-client-sync-interval=10s
+    command: data --etcd-endpoints=http://etcd:2379 
--schema-registry-mode=property
     networks:
       - e2e
 
@@ -35,7 +35,7 @@ services:
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: liaison
-    command: liaison --etcd-endpoints=http://etcd:2379 
--schema-registry-mode=property --schema-property-client-sync-interval=10s
+    command: liaison --etcd-endpoints=http://etcd:2379 
--schema-registry-mode=property
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml 
b/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
index dfaacc9d8..30bc73cb9 100644
--- a/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
@@ -18,7 +18,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone --schema-registry-mode=property 
--schema-property-client-sync-interval=10s
+    command: standalone --schema-registry-mode=property
     networks:
       - e2e
 
diff --git 
a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml 
b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
index 3844c1405..9c6d6f74d 100644
--- a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
@@ -20,7 +20,7 @@ services:
     extends:
       file: ../../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone --schema-registry-mode=property 
--schema-property-client-sync-interval=10s
+    command: standalone --schema-registry-mode=property
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml 
b/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
index 00f811d30..5613290fb 100644
--- a/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
@@ -20,7 +20,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone --schema-registry-mode=property 
--schema-property-client-sync-interval=10s
+    command: standalone --schema-registry-mode=property
     networks:
       - e2e
 
diff --git a/test/integration/distributed/lifecycle/property/suite_test.go 
b/test/integration/distributed/lifecycle/property/suite_test.go
index dca545eb1..22058909e 100644
--- a/test/integration/distributed/lifecycle/property/suite_test.go
+++ b/test/integration/distributed/lifecycle/property/suite_test.go
@@ -91,7 +91,6 @@ func init() {
                                "--schema-registry-mode=property",
                                "--node-discovery-mode=file",
                                fmt.Sprintf("--node-discovery-file-path=%s", 
dfWriter.Path()),
-                               "--schema-property-client-sync-interval=300ms",
                        },
                        StopFunc: func() {
                                closerLiaisonNode()

Reply via email to