mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909956253


##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
                case <-r.closer.CloseNotify():
                        return
                case <-ticker.C:
-                       r.performSync(ctx)
+                       r.syncRound++
+                       if r.syncRound%r.fullReconcileEvery == 0 {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+                               r.performFullSync(ctx)
+                       } else {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+                               r.performIncrementalSync(ctx)
+                       }
                }
        }
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-       round := atomic.AddUint64(&r.syncRound, 1)
-       isFullReconcile := r.shouldFullReconcile(round)
-       names := r.connMgr.ActiveNames()
-       if len(names) == 0 {
-               r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-               return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if old, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+               old.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       ctx, cancel := context.WithCancel(r.closer.Ctx())
+       session := &watchSession{
+               cancelFn:  cancel,
+               syncReqCh: make(chan *syncRequest, 1),
+               syncResCh: make(chan []*digestEntry, 1),
        }
-       r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-               Int("activeNodes", len(names)).Msg("performSync: starting")
-       collector := newSyncCollector()
-       if isFullReconcile {
-               r.collectFullSync(ctx, collector)
+       r.watchSessions[nodeName] = session
+       r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(ctx, nodeName, client.update, session)
+               }()
        } else {
-               r.collectIncrementalSync(ctx, collector)
+               cancel()
+               delete(r.watchSessions, nodeName)
        }
-       r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-       r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-       kindsToSync := r.getKindsToSync()
-       if len(kindsToSync) == 0 {
-               return
-       }
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               for _, kind := range kindsToSync {
-                       query := buildSchemaQuery(kind, "", "", 0)
-                       schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if queryErr != nil {
-                               return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-                       }
-                       if r.l.Debug().Enabled() {
-                               activeSchemas := 0
-                               deletedSchemas := 0
-                               for _, s := range schemas {
-                                       if s.deleteTime > 0 {
-                                               deletedSchemas++
-                                               continue
-                                       }
-                                       activeSchemas++
-                               }
-                               r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                                       Int("schemas", len(schemas)).
-                                       Int("activeSchemas", activeSchemas).
-                                       Int("deletedSchemas", deletedSchemas).
-                                       Msg("collectFullSync: collected")
-                       }
-                       collector.add(nodeName, kind, schemas)
-               }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if session, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling 
watch session")
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
        }
 }
 
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector 
*syncCollector) {
-       sinceRevision := r.cache.GetMaxRevision()
-       r.l.Debug().Int64("sinceRevision", 
sinceRevision).Msg("collectIncrementalSync: querying updates")
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, 
sc.update, sinceRevision)
-               if queryErr != nil {
-                       return queryErr
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+       backoff := watchInitialBackoff
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
                }
-               r.l.Debug().Str("node", nodeName).Int("updatedKinds", 
len(updatedKindNames)).
-                       Strs("kinds", 
updatedKindNames).Msg("collectIncrementalSync: got updates")
-               for _, kindName := range updatedKindNames {
-                       kind, kindErr := KindFromString(kindName)
-                       if kindErr != nil {
-                               r.l.Warn().Str("kindName", 
kindName).Msg("unknown kind from aggregate update")
-                               continue
-                       }
-                       query := buildSchemaQuery(kind, "", "", sinceRevision)
-                       schemas, schemasErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if schemasErr != nil {
-                               return fmt.Errorf("failed to query updated 
schemas for kind %s: %w", kind, schemasErr)
+               err := r.processWatchSession(ctx, updateClient, session)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
                        }
-                       r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                               Int("schemas", 
len(schemas)).Msg("collectIncrementalSync: collected")
-                       collector.add(nodeName, kind, schemas)
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
                }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect 
incremental sync from some nodes")
        }
 }
 
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
-       for kind, propMap := range collector.entries {
-               cachedEntries := r.cache.GetEntriesByKind(kind)
-               seen := make(map[string]bool, len(propMap))
-               for propID, s := range propMap {
-                       seen[propID] = true
-                       if s.deleteTime > 0 {
-                               if entry, exists := cachedEntries[propID]; 
exists {
-                                       r.handleDeletion(kind, propID, entry, 
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+       stream, err := updateClient.WatchSchemas(ctx)
+       if err != nil {
+               return err
+       }
+
+       maxRev := r.cache.GetMaxRevision()
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession: 
sending initial replay request")
+       initReq := &schemav1.WatchSchemasRequest{
+               Criteria: buildRevisionCriteria(maxRev),
+       }
+       if sendErr := stream.Send(initReq); sendErr != nil {
+               return sendErr
+       }
+
+       recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+       recvErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       resp, recvErr := stream.Recv()
+                       if recvErr != nil {
+                               recvErrCh <- recvErr
+                               close(recvCh)
+                               return
+                       }
+                       recvCh <- resp
+               }
+       }()
+
+       var inSync bool
+       var metadataOnly bool
+       var digests []*digestEntry
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case err := <-recvErrCh:
+                       return err
+               case req := <-session.syncReqCh:
+                       r.l.Debug().Bool("metadataOnly", len(req.tagProjection) 
> 0).
+                               Bool("hasCriteria", req.criteria != 
nil).Msg("processWatchSession: sending sync request")
+                       if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+                               Criteria:      req.criteria,
+                               TagProjection: req.tagProjection,
+                       }); sendErr != nil {
+                               return sendErr
+                       }
+                       inSync = true
+                       metadataOnly = len(req.tagProjection) > 0
+                       digests = nil
+               case resp, ok := <-recvCh:
+                       if !ok {
+                               return <-recvErrCh
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               r.l.Debug().Bool("inSync", 
inSync).Msg("processWatchSession: received REPLAY_DONE")
+                               if inSync {
+                                       r.l.Debug().Int("digestCount", 
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+                                       session.syncResCh <- digests
+                                       digests = nil
+                                       inSync = false
+                                       metadataOnly = false
                                }
                                continue
                        }
-                       md, convErr := ToSchema(kind, s.property)
-                       if convErr != nil {
-                               r.l.Warn().Err(convErr).Str("propID", 
propID).Msg("failed to convert property for reconcile")
-                               continue
+                       if inSync && metadataOnly {
+                               digests = append(digests, r.parseDigest(resp))
+                       } else {
+                               r.handleWatchEvent(resp)
                        }
-                       r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
                }
        }
 }
 
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
-       r.mux.RLock()
-       handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
-       for kind := range r.handlers {
-               handlerKindSet[kind] = struct{}{}
-       }
-       r.mux.RUnlock()
-       for _, kind := range r.cache.GetCachedKinds() {
-               handlerKindSet[kind] = struct{}{}
-       }
-       kinds := make([]schema.Kind, 0, len(handlerKindSet))
-       for kind := range handlerKindSet {
-               kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse) 
*digestEntry {
+       prop := resp.GetProperty()
+       parsed := ParseTags(prop.GetTags())
+       return &digestEntry{
+               propID:     prop.GetId(),
+               kind:       parsed.Kind,
+               group:      parsed.Group,
+               name:       parsed.Name,
+               revision:   parsed.UpdatedAt,
+               deleteTime: resp.GetDeleteTime(),
        }
-       return kinds
 }
 
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client 
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
-       query := buildUpdatedSchemasQuery(sinceRevision)
-       resp, rpcErr := client.AggregateSchemaUpdates(ctx, 
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
-       if rpcErr != nil {
-               return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) 
{
+       prop := resp.GetProperty()
+       if prop == nil {
+               return
+       }
+       parsed := ParseTags(prop.GetTags())
+       kindStr := parsed.Kind
+       if kindStr == "" {
+               kindStr = prop.GetMetadata().GetName()
+       }
+       kind, kindErr := KindFromString(kindStr)
+       if kindErr != nil {
+               r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in 
event")
+               return
+       }
+       r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind", 
kind).
+               Str("propID", prop.GetId()).Msg("watch: received event")
+       switch resp.GetEventType() {
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+               md, convErr := ToSchema(kind, prop)
+               if convErr != nil {
+                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert property")
+                       return
+               }
+               r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+               propID := prop.GetId()
+               if r.cache.Delete(propID, resp.GetDeleteTime()) {
+                       md, convErr := ToSchema(kind, prop)
+                       if convErr != nil {
+                               r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert deleted property")
+                               return
+                       }
+                       r.notifyHandlers(kind, md, true)
+               }
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+               // handled by processWatchSession, not expected here
        }
-       return resp.GetNames(), nil
 }
 
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
-       return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               return
+       }
+       maxRev := r.cache.GetMaxRevision()
+       req := &syncRequest{
+               criteria: buildRevisionCriteria(maxRev),
+       }
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync: 
requesting changes")
+       r.sendSyncRequest(ctx, sessions, req)
 }
 
-type kindGroupPair struct {
-       groupName string
-       kind      schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       sessions := make(map[string]*watchSession, len(r.watchSessions))
+       for name, s := range r.watchSessions {
+               sessions[name] = s
+       }
+       return sessions
 }
 
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc 
*schemaClient) error {
-       groups, listErr := r.listGroupFromClient(ctx, sc.management)
-       if listErr != nil {
-               return listErr
-       }
-       var pairs []kindGroupPair
-       for _, group := range groups {
-               r.processInitialResource(schema.KindGroup, group)
-               catalog := group.GetCatalog()
-               groupName := group.GetMetadata().GetName()
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindStream},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindMeasure},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTopNAggregation},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_TRACE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTrace},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_PROPERTY:
-                       pairs = append(pairs, kindGroupPair{groupName: 
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+       sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+       r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest: 
dispatching to watch sessions")
+       for _, s := range sessions {
+               select {
+               case s.syncReqCh <- req:
                default:
-                       r.l.Error().Stringer("catalog", catalog).Str("group", 
groupName).Msg("unknown catalog type during initialization")
                }
        }
-       var wg sync.WaitGroup
-       for _, pair := range pairs {
-               currentPair := pair
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       r.initResourcesFromClient(ctx, sc.management, 
currentPair.kind, currentPair.groupName)
-               }()
+       allDigests := make(map[string][]*digestEntry, len(sessions))
+       for name, s := range sessions {
+               select {
+               case digests := <-s.syncResCh:
+                       allDigests[name] = digests
+               case <-time.After(30 * time.Second):
+                       r.l.Warn().Str("node", name).Msg("sync timeout")
+               case <-ctx.Done():
+                       return nil
+               }
        }
-       wg.Wait()
-       return nil
+       return allDigests
 }
 
-func (r *SchemaRegistry) listGroupFromClient(ctx context.Context, client 
schemav1.SchemaManagementServiceClient) ([]*commonv1.Group, error) {
-       query := buildSchemaQuery(schema.KindGroup, "", "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               return nil, queryErr
+func (r *SchemaRegistry) performFullSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               r.l.Debug().Msg("performFullSync: no active watch sessions, 
skipping")
+               return
        }
-       var groups []*commonv1.Group
-       for _, s := range results {
-               if s.deleteTime > 0 {
-                       continue
-               }
-               md, convErr := ToSchema(schema.KindGroup, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Msg("failed to convert group 
property")
-                       continue
-               }
-               g, ok := md.Spec.(*commonv1.Group)
-               if !ok {
-                       continue
-               }
-               groups = append(groups, g)
+       req := &syncRequest{
+               tagProjection: basicTagKeys,
        }
-       return groups, nil
-}
-
-func (r *SchemaRegistry) initResourcesFromClient(ctx context.Context,
-       client schemav1.SchemaManagementServiceClient, kind schema.Kind, 
groupName string,
-) {
-       query := buildSchemaQuery(kind, groupName, "", 0)
-       results, queryErr := r.querySchemasFromClient(ctx, client, query)
-       if queryErr != nil {
-               r.l.Warn().Err(queryErr).Stringer("kind", kind).Str("group", 
groupName).Msg("failed to list resources for init")
+       allDigests := r.sendSyncRequest(ctx, sessions, req)
+       if allDigests == nil {
                return
        }
-       for _, s := range results {
-               if s.deleteTime > 0 {
+
+       merged := r.mergeDigests(allDigests)
+       cachedEntries := r.cache.GetAllEntries()
+       r.l.Debug().Int("mergedCount", len(merged)).Int("cachedCount", 
len(cachedEntries)).
+               Msg("performFullSync: comparing server digests with local 
cache")
+
+       for propID, d := range merged {
+               if d.deleteTime > 0 {
+                       if entry, exists := cachedEntries[propID]; exists {
+                               kind, kindErr := KindFromString(d.kind)
+                               if kindErr != nil {
+                                       r.l.Warn().Str("kind", 
d.kind).Msg("full sync: unknown kind")
+                                       continue
+                               }
+                               r.handleDeletion(kind, propID, entry, 
d.revision)
+                       }
                        continue
                }
-               md, convErr := ToSchema(kind, s.property)
-               if convErr != nil {
-                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("failed to convert property for init")
-                       continue
+               localEntry := cachedEntries[propID]
+               if localEntry == nil || localEntry.latestUpdateAt < d.revision {
+                       kind, kindErr := KindFromString(d.kind)
+                       if kindErr != nil {
+                               r.l.Warn().Str("kind", d.kind).Msg("full sync: 
unknown kind")
+                               continue
+                       }
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
kind).
+                               Int64("serverRevision", 
d.revision).Msg("performFullSync: fetching updated schema from server")
+                       prop, err := r.getSchema(ctx, kind, d.group, d.name)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to get schema")
+                               continue
+                       }
+                       if prop == nil {
+                               continue
+                       }
+                       md, err := ToSchema(kind, prop)
+                       if err != nil {
+                               r.l.Warn().Err(err).Str("propID", 
propID).Msg("full sync: failed to convert property")
+                               continue
+                       }
+                       r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+               }
+       }
+
+       for propID, entry := range cachedEntries {
+               if _, exists := merged[propID]; !exists {
+                       r.l.Debug().Str("propID", propID).Stringer("kind", 
entry.kind).
+                               Msg("performFullSync: cached entry not found on 
server, deleting")
+                       r.handleDeletion(entry.kind, propID, entry, 
entry.latestUpdateAt)
                }
-               r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
        }
 }
 
-func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec 
proto.Message) {
-       prop, convErr := SchemaToProperty(kind, spec)
-       if convErr != nil {
-               r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to 
convert spec to property for initial resource")
-               return
+func (r *SchemaRegistry) mergeDigests(allDigests map[string][]*digestEntry) 
map[string]*digestEntry {
+       merged := make(map[string]*digestEntry)
+       for _, digests := range allDigests {
+               for _, d := range digests {
+                       existing, exists := merged[d.propID]
+                       if !exists || d.revision > existing.revision {
+                               merged[d.propID] = d
+                       }
+               }

Review Comment:
   fixed the delete time checker. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to