Copilot commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909877731


##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
        schemav1.UnimplementedSchemaUpdateServiceServer
-       server  *server
-       l       *logger.Logger
-       metrics *serverMetrics
+       server   *server
+       watchers *watcherManager
+       l        *logger.Logger
+       metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-       req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-       s.metrics.totalStarted.Inc(1, "aggregate")
-       start := time.Now()
-       defer func() {
-               s.metrics.totalFinished.Inc(1, "aggregate")
-               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+       id, ch := s.watchers.Subscribe()
+       defer s.watchers.Unsubscribe(id)
+
+       reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+       reqErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       req, err := stream.Recv()
+                       if err != nil {
+                               reqErrCh <- err
+                               close(reqCh)
+                               return
+                       }
+                       reqCh <- req
+               }
        }()
-       if req.Query == nil {
-               return nil, errInvalidRequest("query is required")
+
+       for {
+               select {
+               case <-stream.Context().Done():
+                       return stream.Context().Err()
+               case err := <-reqErrCh:
+                       return err
+               case req, ok := <-reqCh:

Review Comment:
   In WatchSchemas, when the client closes its send direction, stream.Recv() 
will return io.EOF. The current goroutine forwards that as an error, causing 
the RPC to return io.EOF instead of a clean nil termination. Handle io.EOF 
explicitly (return nil) to avoid surfacing it as a stream error.



##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
        schemav1.UnimplementedSchemaUpdateServiceServer
-       server  *server
-       l       *logger.Logger
-       metrics *serverMetrics
+       server   *server
+       watchers *watcherManager
+       l        *logger.Logger
+       metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-       req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-       s.metrics.totalStarted.Inc(1, "aggregate")
-       start := time.Now()
-       defer func() {
-               s.metrics.totalFinished.Inc(1, "aggregate")
-               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+       id, ch := s.watchers.Subscribe()
+       defer s.watchers.Unsubscribe(id)
+
+       reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+       reqErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       req, err := stream.Recv()
+                       if err != nil {
+                               reqErrCh <- err
+                               close(reqCh)
+                               return
+                       }
+                       reqCh <- req
+               }
        }()
-       if req.Query == nil {
-               return nil, errInvalidRequest("query is required")
+
+       for {
+               select {
+               case <-stream.Context().Done():
+                       return stream.Context().Err()
+               case err := <-reqErrCh:
+                       return err
+               case req, ok := <-reqCh:
+                       if !ok {
+                               return nil
+                       }
+                       if replayErr := s.replaySchemas(stream, req); replayErr 
!= nil {
+                               return replayErr
+                       }
+                       if doneErr := 
stream.Send(&schemav1.WatchSchemasResponse{
+                               EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+                       }); doneErr != nil {
+                               return doneErr
+                       }
+               case resp, ok := <-ch:
+                       if !ok {
+                               return nil
+                       }
+                       if sendErr := stream.Send(resp); sendErr != nil {
+                               return sendErr
+                       }
+               }
        }
-       req.Query.Groups = []string{schema.SchemaGroup}
-       results, queryErr := s.server.db.Query(ctx, req.Query)
-       if queryErr != nil {
-               s.metrics.totalErr.Inc(1, "aggregate")
-               s.l.Error().Err(queryErr).Msg("failed to aggregate schema 
updates")
-               return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+       req *schemav1.WatchSchemasRequest,
+) error {
+       metadataOnly := len(req.TagProjection) > 0
+       query := &propertyv1.QueryRequest{
+               Groups:   []string{schema.SchemaGroup},
+               Criteria: req.Criteria,
+       }
+       results, err := s.server.db.Query(stream.Context(), query)
+       if err != nil {
+               return err
        }
-       nameSet := make(map[string]struct{}, len(results))
        for _, result := range results {
                var p propertyv1.Property
                if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
-                       s.metrics.totalErr.Inc(1, "aggregate")
-                       return nil, unmarshalErr
+                       s.l.Warn().Err(unmarshalErr).Msg("replay: failed to 
unmarshal property")
+                       continue
+               }

Review Comment:
   replaySchemas skips entries on protojson.Unmarshal failure and continues 
sending the rest. This can leave clients with a permanently incomplete replay 
(and no retry) while the server only logs a warning. Consider returning an 
error (so the stream retries) and/or incrementing metrics consistently with 
ListSchemas/DeleteSchema, instead of silently continuing.



##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
                case <-r.closer.CloseNotify():
                        return
                case <-ticker.C:
-                       r.performSync(ctx)
+                       r.syncRound++
+                       if r.syncRound%r.fullReconcileEvery == 0 {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+                               r.performFullSync(ctx)
+                       } else {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+                               r.performIncrementalSync(ctx)
+                       }
                }
        }
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-       round := atomic.AddUint64(&r.syncRound, 1)
-       isFullReconcile := r.shouldFullReconcile(round)
-       names := r.connMgr.ActiveNames()
-       if len(names) == 0 {
-               r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-               return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if old, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+               old.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       ctx, cancel := context.WithCancel(r.closer.Ctx())
+       session := &watchSession{
+               cancelFn:  cancel,
+               syncReqCh: make(chan *syncRequest, 1),
+               syncResCh: make(chan []*digestEntry, 1),
        }
-       r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-               Int("activeNodes", len(names)).Msg("performSync: starting")
-       collector := newSyncCollector()
-       if isFullReconcile {
-               r.collectFullSync(ctx, collector)
+       r.watchSessions[nodeName] = session
+       r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(ctx, nodeName, client.update, session)
+               }()
        } else {
-               r.collectIncrementalSync(ctx, collector)
+               cancel()
+               delete(r.watchSessions, nodeName)
        }
-       r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-       r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-       kindsToSync := r.getKindsToSync()
-       if len(kindsToSync) == 0 {
-               return
-       }
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               for _, kind := range kindsToSync {
-                       query := buildSchemaQuery(kind, "", "", 0)
-                       schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if queryErr != nil {
-                               return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-                       }
-                       if r.l.Debug().Enabled() {
-                               activeSchemas := 0
-                               deletedSchemas := 0
-                               for _, s := range schemas {
-                                       if s.deleteTime > 0 {
-                                               deletedSchemas++
-                                               continue
-                                       }
-                                       activeSchemas++
-                               }
-                               r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                                       Int("schemas", len(schemas)).
-                                       Int("activeSchemas", activeSchemas).
-                                       Int("deletedSchemas", deletedSchemas).
-                                       Msg("collectFullSync: collected")
-                       }
-                       collector.add(nodeName, kind, schemas)
-               }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if session, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling 
watch session")
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
        }
 }
 
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector 
*syncCollector) {
-       sinceRevision := r.cache.GetMaxRevision()
-       r.l.Debug().Int64("sinceRevision", 
sinceRevision).Msg("collectIncrementalSync: querying updates")
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, 
sc.update, sinceRevision)
-               if queryErr != nil {
-                       return queryErr
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+       backoff := watchInitialBackoff
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
                }
-               r.l.Debug().Str("node", nodeName).Int("updatedKinds", 
len(updatedKindNames)).
-                       Strs("kinds", 
updatedKindNames).Msg("collectIncrementalSync: got updates")
-               for _, kindName := range updatedKindNames {
-                       kind, kindErr := KindFromString(kindName)
-                       if kindErr != nil {
-                               r.l.Warn().Str("kindName", 
kindName).Msg("unknown kind from aggregate update")
-                               continue
-                       }
-                       query := buildSchemaQuery(kind, "", "", sinceRevision)
-                       schemas, schemasErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if schemasErr != nil {
-                               return fmt.Errorf("failed to query updated 
schemas for kind %s: %w", kind, schemasErr)
+               err := r.processWatchSession(ctx, updateClient, session)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
                        }
-                       r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                               Int("schemas", 
len(schemas)).Msg("collectIncrementalSync: collected")
-                       collector.add(nodeName, kind, schemas)
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
                }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect 
incremental sync from some nodes")
        }
 }
 
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
-       for kind, propMap := range collector.entries {
-               cachedEntries := r.cache.GetEntriesByKind(kind)
-               seen := make(map[string]bool, len(propMap))
-               for propID, s := range propMap {
-                       seen[propID] = true
-                       if s.deleteTime > 0 {
-                               if entry, exists := cachedEntries[propID]; 
exists {
-                                       r.handleDeletion(kind, propID, entry, 
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+       stream, err := updateClient.WatchSchemas(ctx)
+       if err != nil {
+               return err
+       }
+
+       maxRev := r.cache.GetMaxRevision()
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession: 
sending initial replay request")
+       initReq := &schemav1.WatchSchemasRequest{
+               Criteria: buildRevisionCriteria(maxRev),
+       }
+       if sendErr := stream.Send(initReq); sendErr != nil {
+               return sendErr
+       }
+
+       recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+       recvErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       resp, recvErr := stream.Recv()
+                       if recvErr != nil {
+                               recvErrCh <- recvErr
+                               close(recvCh)
+                               return
+                       }
+                       recvCh <- resp
+               }
+       }()
+
+       var inSync bool
+       var metadataOnly bool
+       var digests []*digestEntry
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case err := <-recvErrCh:
+                       return err
+               case req := <-session.syncReqCh:
+                       r.l.Debug().Bool("metadataOnly", len(req.tagProjection) 
> 0).
+                               Bool("hasCriteria", req.criteria != 
nil).Msg("processWatchSession: sending sync request")
+                       if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+                               Criteria:      req.criteria,
+                               TagProjection: req.tagProjection,
+                       }); sendErr != nil {
+                               return sendErr
+                       }
+                       inSync = true
+                       metadataOnly = len(req.tagProjection) > 0
+                       digests = nil
+               case resp, ok := <-recvCh:
+                       if !ok {
+                               return <-recvErrCh
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               r.l.Debug().Bool("inSync", 
inSync).Msg("processWatchSession: received REPLAY_DONE")
+                               if inSync {
+                                       r.l.Debug().Int("digestCount", 
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+                                       session.syncResCh <- digests
+                                       digests = nil
+                                       inSync = false
+                                       metadataOnly = false
                                }
                                continue
                        }
-                       md, convErr := ToSchema(kind, s.property)
-                       if convErr != nil {
-                               r.l.Warn().Err(convErr).Str("propID", 
propID).Msg("failed to convert property for reconcile")
-                               continue
+                       if inSync && metadataOnly {
+                               digests = append(digests, r.parseDigest(resp))
+                       } else {
+                               r.handleWatchEvent(resp)
                        }
-                       r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
                }
        }
 }
 
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
-       r.mux.RLock()
-       handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
-       for kind := range r.handlers {
-               handlerKindSet[kind] = struct{}{}
-       }
-       r.mux.RUnlock()
-       for _, kind := range r.cache.GetCachedKinds() {
-               handlerKindSet[kind] = struct{}{}
-       }
-       kinds := make([]schema.Kind, 0, len(handlerKindSet))
-       for kind := range handlerKindSet {
-               kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse) 
*digestEntry {
+       prop := resp.GetProperty()
+       parsed := ParseTags(prop.GetTags())
+       return &digestEntry{
+               propID:     prop.GetId(),
+               kind:       parsed.Kind,
+               group:      parsed.Group,
+               name:       parsed.Name,
+               revision:   parsed.UpdatedAt,
+               deleteTime: resp.GetDeleteTime(),
        }
-       return kinds
 }
 
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client 
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
-       query := buildUpdatedSchemasQuery(sinceRevision)
-       resp, rpcErr := client.AggregateSchemaUpdates(ctx, 
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
-       if rpcErr != nil {
-               return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) 
{
+       prop := resp.GetProperty()
+       if prop == nil {
+               return
+       }
+       parsed := ParseTags(prop.GetTags())
+       kindStr := parsed.Kind
+       if kindStr == "" {
+               kindStr = prop.GetMetadata().GetName()
+       }
+       kind, kindErr := KindFromString(kindStr)
+       if kindErr != nil {
+               r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in 
event")
+               return
+       }
+       r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind", 
kind).
+               Str("propID", prop.GetId()).Msg("watch: received event")
+       switch resp.GetEventType() {
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+               md, convErr := ToSchema(kind, prop)
+               if convErr != nil {
+                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert property")
+                       return
+               }
+               r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+               propID := prop.GetId()
+               if r.cache.Delete(propID, resp.GetDeleteTime()) {
+                       md, convErr := ToSchema(kind, prop)
+                       if convErr != nil {
+                               r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert deleted property")
+                               return
+                       }
+                       r.notifyHandlers(kind, md, true)
+               }
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+               // handled by processWatchSession, not expected here
        }
-       return resp.GetNames(), nil
 }
 
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
-       return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               return
+       }
+       maxRev := r.cache.GetMaxRevision()
+       req := &syncRequest{
+               criteria: buildRevisionCriteria(maxRev),
+       }
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync: 
requesting changes")
+       r.sendSyncRequest(ctx, sessions, req)
 }
 
-type kindGroupPair struct {
-       groupName string
-       kind      schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       sessions := make(map[string]*watchSession, len(r.watchSessions))
+       for name, s := range r.watchSessions {
+               sessions[name] = s
+       }
+       return sessions
 }
 
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc 
*schemaClient) error {
-       groups, listErr := r.listGroupFromClient(ctx, sc.management)
-       if listErr != nil {
-               return listErr
-       }
-       var pairs []kindGroupPair
-       for _, group := range groups {
-               r.processInitialResource(schema.KindGroup, group)
-               catalog := group.GetCatalog()
-               groupName := group.GetMetadata().GetName()
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindStream},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindMeasure},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTopNAggregation},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_TRACE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTrace},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_PROPERTY:
-                       pairs = append(pairs, kindGroupPair{groupName: 
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+       sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+       r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest: 
dispatching to watch sessions")
+       for _, s := range sessions {
+               select {
+               case s.syncReqCh <- req:
                default:
-                       r.l.Error().Stringer("catalog", catalog).Str("group", 
groupName).Msg("unknown catalog type during initialization")
                }
        }
-       var wg sync.WaitGroup
-       for _, pair := range pairs {
-               currentPair := pair
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       r.initResourcesFromClient(ctx, sc.management, 
currentPair.kind, currentPair.groupName)
-               }()
+       allDigests := make(map[string][]*digestEntry, len(sessions))
+       for name, s := range sessions {
+               select {
+               case digests := <-s.syncResCh:
+                       allDigests[name] = digests
+               case <-time.After(30 * time.Second):
+                       r.l.Warn().Str("node", name).Msg("sync timeout")
+               case <-ctx.Done():
+                       return nil
+               }

Review Comment:
   sendSyncRequest drops a sync request when syncReqCh is full (default 
branch), but still waits for a response on syncResCh. If the send is dropped, 
this call will block until the 30s timeout for that session, stalling the sync 
loop and delaying reconciliation. Consider guaranteeing delivery (block until 
sent / overwrite previous pending request) or tracking which sessions actually 
received the request and only waiting for those.



##########
banyand/metadata/schema/schemaserver/watcher.go:
##########
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "sync"
+
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const watcherChannelSize = 512
+
+type watcherManager struct {
+       l        *logger.Logger
+       watchers map[uint64]chan *schemav1.WatchSchemasResponse
+       mu       sync.RWMutex
+       nextID   uint64
+}
+
+func newWatcherManager(l *logger.Logger) *watcherManager {
+       return &watcherManager{
+               l:        l,
+               watchers: make(map[uint64]chan *schemav1.WatchSchemasResponse),
+       }
+}
+
+// Subscribe registers a new watcher and returns its ID and event channel.
+func (wm *watcherManager) Subscribe() (uint64, <-chan 
*schemav1.WatchSchemasResponse) {
+       wm.mu.Lock()
+       defer wm.mu.Unlock()
+       wm.nextID++
+       id := wm.nextID
+       ch := make(chan *schemav1.WatchSchemasResponse, watcherChannelSize)
+       wm.watchers[id] = ch
+       wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers", 
len(wm.watchers)).Msg("watcher subscribed")
+       return id, ch
+}
+
+// Unsubscribe removes a watcher and closes its channel.
+func (wm *watcherManager) Unsubscribe(id uint64) {
+       wm.mu.Lock()
+       defer wm.mu.Unlock()
+       if ch, ok := wm.watchers[id]; ok {
+               delete(wm.watchers, id)
+               close(ch)
+               wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers", 
len(wm.watchers)).Msg("watcher unsubscribed")
+       }
+}
+
+// Broadcast sends an event to all watchers without blocking.
+func (wm *watcherManager) Broadcast(resp *schemav1.WatchSchemasResponse) {
+       wm.mu.RLock()
+       defer wm.mu.RUnlock()
+       wm.l.Debug().Stringer("eventType", resp.GetEventType()).
+               Str("propertyID", resp.GetProperty().GetId()).
+               Int("watcherCount", len(wm.watchers)).
+               Msg("broadcasting schema event")
+       for id, ch := range wm.watchers {
+               select {
+               case ch <- resp:
+               default:
+                       wm.l.Warn().Uint64("watcherID", id).
+                               Stringer("eventType", resp.GetEventType()).
+                               Str("propertyID", resp.GetProperty().GetId()).
+                               Msg("watcher channel full, dropping event")
+               }

Review Comment:
   watcherManager.Broadcast logs resp.GetProperty().GetId() without checking if 
Property is nil. Broadcast can be invoked with responses like REPLAY_DONE or an 
empty response (see watcher_test), which will panic on a nil Property. Guard 
against nil resp/property in the log fields (e.g., compute propertyID only when 
resp.GetProperty()!=nil) before broadcasting.



##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
                case <-r.closer.CloseNotify():
                        return
                case <-ticker.C:
-                       r.performSync(ctx)
+                       r.syncRound++
+                       if r.syncRound%r.fullReconcileEvery == 0 {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+                               r.performFullSync(ctx)
+                       } else {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+                               r.performIncrementalSync(ctx)
+                       }
                }
        }
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-       round := atomic.AddUint64(&r.syncRound, 1)
-       isFullReconcile := r.shouldFullReconcile(round)
-       names := r.connMgr.ActiveNames()
-       if len(names) == 0 {
-               r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-               return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if old, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+               old.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       ctx, cancel := context.WithCancel(r.closer.Ctx())
+       session := &watchSession{
+               cancelFn:  cancel,
+               syncReqCh: make(chan *syncRequest, 1),
+               syncResCh: make(chan []*digestEntry, 1),
        }
-       r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-               Int("activeNodes", len(names)).Msg("performSync: starting")
-       collector := newSyncCollector()
-       if isFullReconcile {
-               r.collectFullSync(ctx, collector)
+       r.watchSessions[nodeName] = session
+       r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(ctx, nodeName, client.update, session)
+               }()
        } else {
-               r.collectIncrementalSync(ctx, collector)
+               cancel()
+               delete(r.watchSessions, nodeName)
        }
-       r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-       r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-       kindsToSync := r.getKindsToSync()
-       if len(kindsToSync) == 0 {
-               return
-       }
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               for _, kind := range kindsToSync {
-                       query := buildSchemaQuery(kind, "", "", 0)
-                       schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if queryErr != nil {
-                               return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-                       }
-                       if r.l.Debug().Enabled() {
-                               activeSchemas := 0
-                               deletedSchemas := 0
-                               for _, s := range schemas {
-                                       if s.deleteTime > 0 {
-                                               deletedSchemas++
-                                               continue
-                                       }
-                                       activeSchemas++
-                               }
-                               r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                                       Int("schemas", len(schemas)).
-                                       Int("activeSchemas", activeSchemas).
-                                       Int("deletedSchemas", deletedSchemas).
-                                       Msg("collectFullSync: collected")
-                       }
-                       collector.add(nodeName, kind, schemas)
-               }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if session, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling 
watch session")
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
        }
 }
 
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector 
*syncCollector) {
-       sinceRevision := r.cache.GetMaxRevision()
-       r.l.Debug().Int64("sinceRevision", 
sinceRevision).Msg("collectIncrementalSync: querying updates")
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, 
sc.update, sinceRevision)
-               if queryErr != nil {
-                       return queryErr
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+       backoff := watchInitialBackoff
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
                }
-               r.l.Debug().Str("node", nodeName).Int("updatedKinds", 
len(updatedKindNames)).
-                       Strs("kinds", 
updatedKindNames).Msg("collectIncrementalSync: got updates")
-               for _, kindName := range updatedKindNames {
-                       kind, kindErr := KindFromString(kindName)
-                       if kindErr != nil {
-                               r.l.Warn().Str("kindName", 
kindName).Msg("unknown kind from aggregate update")
-                               continue
-                       }
-                       query := buildSchemaQuery(kind, "", "", sinceRevision)
-                       schemas, schemasErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if schemasErr != nil {
-                               return fmt.Errorf("failed to query updated 
schemas for kind %s: %w", kind, schemasErr)
+               err := r.processWatchSession(ctx, updateClient, session)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
                        }
-                       r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                               Int("schemas", 
len(schemas)).Msg("collectIncrementalSync: collected")
-                       collector.add(nodeName, kind, schemas)
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
                }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect 
incremental sync from some nodes")
        }
 }
 
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
-       for kind, propMap := range collector.entries {
-               cachedEntries := r.cache.GetEntriesByKind(kind)
-               seen := make(map[string]bool, len(propMap))
-               for propID, s := range propMap {
-                       seen[propID] = true
-                       if s.deleteTime > 0 {
-                               if entry, exists := cachedEntries[propID]; 
exists {
-                                       r.handleDeletion(kind, propID, entry, 
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+       stream, err := updateClient.WatchSchemas(ctx)
+       if err != nil {
+               return err
+       }
+
+       maxRev := r.cache.GetMaxRevision()
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession: 
sending initial replay request")
+       initReq := &schemav1.WatchSchemasRequest{
+               Criteria: buildRevisionCriteria(maxRev),
+       }
+       if sendErr := stream.Send(initReq); sendErr != nil {
+               return sendErr
+       }
+
+       recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+       recvErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       resp, recvErr := stream.Recv()
+                       if recvErr != nil {
+                               recvErrCh <- recvErr
+                               close(recvCh)
+                               return
+                       }
+                       recvCh <- resp
+               }
+       }()
+
+       var inSync bool
+       var metadataOnly bool
+       var digests []*digestEntry
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case err := <-recvErrCh:
+                       return err
+               case req := <-session.syncReqCh:
+                       r.l.Debug().Bool("metadataOnly", len(req.tagProjection) 
> 0).
+                               Bool("hasCriteria", req.criteria != 
nil).Msg("processWatchSession: sending sync request")
+                       if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+                               Criteria:      req.criteria,
+                               TagProjection: req.tagProjection,
+                       }); sendErr != nil {
+                               return sendErr
+                       }
+                       inSync = true
+                       metadataOnly = len(req.tagProjection) > 0
+                       digests = nil
+               case resp, ok := <-recvCh:
+                       if !ok {
+                               return <-recvErrCh
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               r.l.Debug().Bool("inSync", 
inSync).Msg("processWatchSession: received REPLAY_DONE")
+                               if inSync {
+                                       r.l.Debug().Int("digestCount", 
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+                                       session.syncResCh <- digests
+                                       digests = nil
+                                       inSync = false
+                                       metadataOnly = false
                                }
                                continue
                        }
-                       md, convErr := ToSchema(kind, s.property)
-                       if convErr != nil {
-                               r.l.Warn().Err(convErr).Str("propID", 
propID).Msg("failed to convert property for reconcile")
-                               continue
+                       if inSync && metadataOnly {
+                               digests = append(digests, r.parseDigest(resp))
+                       } else {
+                               r.handleWatchEvent(resp)
                        }
-                       r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
                }
        }
 }
 
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
-       r.mux.RLock()
-       handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
-       for kind := range r.handlers {
-               handlerKindSet[kind] = struct{}{}
-       }
-       r.mux.RUnlock()
-       for _, kind := range r.cache.GetCachedKinds() {
-               handlerKindSet[kind] = struct{}{}
-       }
-       kinds := make([]schema.Kind, 0, len(handlerKindSet))
-       for kind := range handlerKindSet {
-               kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse) 
*digestEntry {
+       prop := resp.GetProperty()
+       parsed := ParseTags(prop.GetTags())
+       return &digestEntry{
+               propID:     prop.GetId(),
+               kind:       parsed.Kind,
+               group:      parsed.Group,
+               name:       parsed.Name,
+               revision:   parsed.UpdatedAt,
+               deleteTime: resp.GetDeleteTime(),
        }
-       return kinds
 }
 
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client 
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
-       query := buildUpdatedSchemasQuery(sinceRevision)
-       resp, rpcErr := client.AggregateSchemaUpdates(ctx, 
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
-       if rpcErr != nil {
-               return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) 
{
+       prop := resp.GetProperty()
+       if prop == nil {
+               return
+       }
+       parsed := ParseTags(prop.GetTags())
+       kindStr := parsed.Kind
+       if kindStr == "" {
+               kindStr = prop.GetMetadata().GetName()
+       }
+       kind, kindErr := KindFromString(kindStr)
+       if kindErr != nil {
+               r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in 
event")
+               return
+       }
+       r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind", 
kind).
+               Str("propID", prop.GetId()).Msg("watch: received event")
+       switch resp.GetEventType() {
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+               md, convErr := ToSchema(kind, prop)
+               if convErr != nil {
+                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert property")
+                       return
+               }
+               r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+               propID := prop.GetId()
+               if r.cache.Delete(propID, resp.GetDeleteTime()) {
+                       md, convErr := ToSchema(kind, prop)
+                       if convErr != nil {
+                               r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert deleted property")
+                               return
+                       }
+                       r.notifyHandlers(kind, md, true)
+               }
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+               // handled by processWatchSession, not expected here
        }
-       return resp.GetNames(), nil
 }
 
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
-       return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               return
+       }
+       maxRev := r.cache.GetMaxRevision()
+       req := &syncRequest{
+               criteria: buildRevisionCriteria(maxRev),
+       }
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync: 
requesting changes")
+       r.sendSyncRequest(ctx, sessions, req)
 }
 
-type kindGroupPair struct {
-       groupName string
-       kind      schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       sessions := make(map[string]*watchSession, len(r.watchSessions))
+       for name, s := range r.watchSessions {
+               sessions[name] = s
+       }
+       return sessions
 }
 
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc 
*schemaClient) error {
-       groups, listErr := r.listGroupFromClient(ctx, sc.management)
-       if listErr != nil {
-               return listErr
-       }
-       var pairs []kindGroupPair
-       for _, group := range groups {
-               r.processInitialResource(schema.KindGroup, group)
-               catalog := group.GetCatalog()
-               groupName := group.GetMetadata().GetName()
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindStream},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindMeasure},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTopNAggregation},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_TRACE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTrace},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_PROPERTY:
-                       pairs = append(pairs, kindGroupPair{groupName: 
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+       sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+       r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest: 
dispatching to watch sessions")
+       for _, s := range sessions {
+               select {
+               case s.syncReqCh <- req:
                default:
-                       r.l.Error().Stringer("catalog", catalog).Str("group", 
groupName).Msg("unknown catalog type during initialization")
                }
        }
-       var wg sync.WaitGroup
-       for _, pair := range pairs {
-               currentPair := pair
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       r.initResourcesFromClient(ctx, sc.management, 
currentPair.kind, currentPair.groupName)
-               }()
+       allDigests := make(map[string][]*digestEntry, len(sessions))
+       for name, s := range sessions {
+               select {
+               case digests := <-s.syncResCh:
+                       allDigests[name] = digests
+               case <-time.After(30 * time.Second):
+                       r.l.Warn().Str("node", name).Msg("sync timeout")
+               case <-ctx.Done():
+                       return nil
+               }
        }
-       wg.Wait()
-       return nil
+       return allDigests
 }
 
-func (r *SchemaRegistry) listGroupFromClient(ctx context.Context, client 
schemav1.SchemaManagementServiceClient) ([]*commonv1.Group, error) {
-       query := buildSchemaQuery(schema.KindGroup, "", "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               return nil, queryErr
+func (r *SchemaRegistry) performFullSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               r.l.Debug().Msg("performFullSync: no active watch sessions, 
skipping")
+               return
        }
-       var groups []*commonv1.Group
-       for _, s := range results {
-               if s.deleteTime > 0 {
-                       continue
-               }
-               md, convErr := ToSchema(schema.KindGroup, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Msg("failed to convert group 
property")
-                       continue
-               }
-               g, ok := md.Spec.(*commonv1.Group)
-               if !ok {
-                       continue
-               }
-               groups = append(groups, g)
+       req := &syncRequest{
+               tagProjection: basicTagKeys,
        }
-       return groups, nil
-}
-
-func (r *SchemaRegistry) initResourcesFromClient(ctx context.Context,
-       client schemav1.SchemaManagementServiceClient, kind schema.Kind, 
groupName string,
-) {
-       query := buildSchemaQuery(kind, groupName, "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               r.l.Warn().Err(queryErr).Stringer("kind", kind).Str("group", 
groupName).Msg("failed to list resources for init")
+       allDigests := r.sendSyncRequest(ctx, sessions, req)
+       if allDigests == nil {
                return
        }
-       for _, s := range results {
-               if s.deleteTime > 0 {
+
+       merged := r.mergeDigests(allDigests)
+       cachedEntries := r.cache.GetAllEntries()
+       r.l.Debug().Int("mergedCount", len(merged)).Int("cachedCount", 
len(cachedEntries)).
+               Msg("performFullSync: comparing server digests with local 
cache")
+
+       for propID, d := range merged {
+               if d.deleteTime > 0 {
+                       if entry, exists := cachedEntries[propID]; exists {
+                               kind, kindErr := KindFromString(d.kind)
+                               if kindErr != nil {
+                                       r.l.Warn().Str("kind", 
d.kind).Msg("full sync: unknown kind")
+                                       continue
+                               }
+                               r.handleDeletion(kind, propID, entry, 
d.revision)
+                       }
                        continue
                }
-               md, convErr := ToSchema(kind, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("failed to convert property for init")
-                       continue
+               localEntry := cachedEntries[propID]
+               if localEntry == nil || localEntry.latestUpdateAt < d.revision {
+                       kind, kindErr := KindFromString(d.kind)
+                       if kindErr != nil {
+                               r.l.Warn().Str("kind", d.kind).Msg("full sync: 
unknown kind")
+                               continue
+                       }
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
kind).
+                               Int64("serverRevision", 
d.revision).Msg("performFullSync: fetching updated schema from server")
+                       prop, err := r.getSchema(ctx, kind, d.group, d.name)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to get schema")
+                               continue
+                       }
+                       if prop == nil {
+                               continue
+                       }
+                       md, err := ToSchema(kind, prop)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to convert property")
+                               continue
+                       }
+                       r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+               }
+       }
+
+       for propID, entry := range cachedEntries {
+               if _, exists := merged[propID]; !exists {
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
entry.kind).
+                               Msg("performFullSync: cached entry not found on 
server, deleting")
+                       r.handleDeletion(entry.kind, propID, entry, 
entry.latestUpdateAt)
                }
-               r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
        }
 }
 
-func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec 
proto.Message) {
-       prop, convErr := SchemaToProperty(kind, spec)
-       if convErr != nil {
-               r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to 
convert spec to property for initial resource")
-               return
+func (r *SchemaRegistry) mergeDigests(allDigests map[string][]*digestEntry) 
map[string]*digestEntry {
+       merged := make(map[string]*digestEntry)
+       for _, digests := range allDigests {
+               for _, d := range digests {
+                       existing, exists := merged[d.propID]
+                       if !exists || d.revision > existing.revision {
+                               merged[d.propID] = d
+                       }
+               }

Review Comment:
   mergeDigests only prefers the highest revision; when revisions are equal 
(common for deletes since updated_at doesn't change), the selected digest is 
nondeterministic due to map iteration order and may flip between 
deleted/non-deleted across runs. Add a deterministic tie-breaker (e.g., for 
equal revision prefer non-deleted, or prefer the larger deleteTime) to avoid 
flakiness and incorrect deletions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to