mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909954608


##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
                case <-r.closer.CloseNotify():
                        return
                case <-ticker.C:
-                       r.performSync(ctx)
+                       r.syncRound++
+                       if r.syncRound%r.fullReconcileEvery == 0 {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+                               r.performFullSync(ctx)
+                       } else {
+                               r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+                               r.performIncrementalSync(ctx)
+                       }
                }
        }
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-       round := atomic.AddUint64(&r.syncRound, 1)
-       isFullReconcile := r.shouldFullReconcile(round)
-       names := r.connMgr.ActiveNames()
-       if len(names) == 0 {
-               r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-               return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if old, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+               old.cancelFn()
+               delete(r.watchSessions, nodeName)
+       }
+       ctx, cancel := context.WithCancel(r.closer.Ctx())
+       session := &watchSession{
+               cancelFn:  cancel,
+               syncReqCh: make(chan *syncRequest, 1),
+               syncResCh: make(chan []*digestEntry, 1),
        }
-       r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-               Int("activeNodes", len(names)).Msg("performSync: starting")
-       collector := newSyncCollector()
-       if isFullReconcile {
-               r.collectFullSync(ctx, collector)
+       r.watchSessions[nodeName] = session
+       r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(ctx, nodeName, client.update, session)
+               }()
        } else {
-               r.collectIncrementalSync(ctx, collector)
+               cancel()
+               delete(r.watchSessions, nodeName)
        }
-       r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-       r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-       kindsToSync := r.getKindsToSync()
-       if len(kindsToSync) == 0 {
-               return
-       }
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               for _, kind := range kindsToSync {
-                       query := buildSchemaQuery(kind, "", "", 0)
-                       schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if queryErr != nil {
-                               return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-                       }
-                       if r.l.Debug().Enabled() {
-                               activeSchemas := 0
-                               deletedSchemas := 0
-                               for _, s := range schemas {
-                                       if s.deleteTime > 0 {
-                                               deletedSchemas++
-                                               continue
-                                       }
-                                       activeSchemas++
-                               }
-                               r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                                       Int("schemas", len(schemas)).
-                                       Int("activeSchemas", activeSchemas).
-                                       Int("deletedSchemas", deletedSchemas).
-                                       Msg("collectFullSync: collected")
-                       }
-                       collector.add(nodeName, kind, schemas)
-               }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       if session, exists := r.watchSessions[nodeName]; exists {
+               r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling 
watch session")
+               session.cancelFn()
+               delete(r.watchSessions, nodeName)
        }
 }
 
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector 
*syncCollector) {
-       sinceRevision := r.cache.GetMaxRevision()
-       r.l.Debug().Int64("sinceRevision", 
sinceRevision).Msg("collectIncrementalSync: querying updates")
-       broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-               updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx, 
sc.update, sinceRevision)
-               if queryErr != nil {
-                       return queryErr
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+       backoff := watchInitialBackoff
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
                }
-               r.l.Debug().Str("node", nodeName).Int("updatedKinds", 
len(updatedKindNames)).
-                       Strs("kinds", 
updatedKindNames).Msg("collectIncrementalSync: got updates")
-               for _, kindName := range updatedKindNames {
-                       kind, kindErr := KindFromString(kindName)
-                       if kindErr != nil {
-                               r.l.Warn().Str("kindName", 
kindName).Msg("unknown kind from aggregate update")
-                               continue
-                       }
-                       query := buildSchemaQuery(kind, "", "", sinceRevision)
-                       schemas, schemasErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-                       if schemasErr != nil {
-                               return fmt.Errorf("failed to query updated 
schemas for kind %s: %w", kind, schemasErr)
+               err := r.processWatchSession(ctx, updateClient, session)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
                        }
-                       r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-                               Int("schemas", 
len(schemas)).Msg("collectIncrementalSync: collected")
-                       collector.add(nodeName, kind, schemas)
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
                }
-               return nil
-       })
-       if broadcastErr != nil {
-               r.l.Error().Err(broadcastErr).Msg("failed to collect 
incremental sync from some nodes")
        }
 }
 
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
-       for kind, propMap := range collector.entries {
-               cachedEntries := r.cache.GetEntriesByKind(kind)
-               seen := make(map[string]bool, len(propMap))
-               for propID, s := range propMap {
-                       seen[propID] = true
-                       if s.deleteTime > 0 {
-                               if entry, exists := cachedEntries[propID]; 
exists {
-                                       r.handleDeletion(kind, propID, entry, 
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+       updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+       stream, err := updateClient.WatchSchemas(ctx)
+       if err != nil {
+               return err
+       }
+
+       maxRev := r.cache.GetMaxRevision()
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession: 
sending initial replay request")
+       initReq := &schemav1.WatchSchemasRequest{
+               Criteria: buildRevisionCriteria(maxRev),
+       }
+       if sendErr := stream.Send(initReq); sendErr != nil {
+               return sendErr
+       }
+
+       recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+       recvErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       resp, recvErr := stream.Recv()
+                       if recvErr != nil {
+                               recvErrCh <- recvErr
+                               close(recvCh)
+                               return
+                       }
+                       recvCh <- resp
+               }
+       }()
+
+       var inSync bool
+       var metadataOnly bool
+       var digests []*digestEntry
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case err := <-recvErrCh:
+                       return err
+               case req := <-session.syncReqCh:
+                       r.l.Debug().Bool("metadataOnly", len(req.tagProjection) 
> 0).
+                               Bool("hasCriteria", req.criteria != 
nil).Msg("processWatchSession: sending sync request")
+                       if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+                               Criteria:      req.criteria,
+                               TagProjection: req.tagProjection,
+                       }); sendErr != nil {
+                               return sendErr
+                       }
+                       inSync = true
+                       metadataOnly = len(req.tagProjection) > 0
+                       digests = nil
+               case resp, ok := <-recvCh:
+                       if !ok {
+                               return <-recvErrCh
+                       }
+                       if resp.EventType == 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+                               r.l.Debug().Bool("inSync", 
inSync).Msg("processWatchSession: received REPLAY_DONE")
+                               if inSync {
+                                       r.l.Debug().Int("digestCount", 
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+                                       session.syncResCh <- digests
+                                       digests = nil
+                                       inSync = false
+                                       metadataOnly = false
                                }
                                continue
                        }
-                       md, convErr := ToSchema(kind, s.property)
-                       if convErr != nil {
-                               r.l.Warn().Err(convErr).Str("propID", 
propID).Msg("failed to convert property for reconcile")
-                               continue
+                       if inSync && metadataOnly {
+                               digests = append(digests, r.parseDigest(resp))
+                       } else {
+                               r.handleWatchEvent(resp)
                        }
-                       r.processInitialResourceFromProperty(kind, s.property, 
md.Spec.(proto.Message))
                }
        }
 }
 
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
-       r.mux.RLock()
-       handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
-       for kind := range r.handlers {
-               handlerKindSet[kind] = struct{}{}
-       }
-       r.mux.RUnlock()
-       for _, kind := range r.cache.GetCachedKinds() {
-               handlerKindSet[kind] = struct{}{}
-       }
-       kinds := make([]schema.Kind, 0, len(handlerKindSet))
-       for kind := range handlerKindSet {
-               kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse) 
*digestEntry {
+       prop := resp.GetProperty()
+       parsed := ParseTags(prop.GetTags())
+       return &digestEntry{
+               propID:     prop.GetId(),
+               kind:       parsed.Kind,
+               group:      parsed.Group,
+               name:       parsed.Name,
+               revision:   parsed.UpdatedAt,
+               deleteTime: resp.GetDeleteTime(),
        }
-       return kinds
 }
 
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client 
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
-       query := buildUpdatedSchemasQuery(sinceRevision)
-       resp, rpcErr := client.AggregateSchemaUpdates(ctx, 
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
-       if rpcErr != nil {
-               return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) 
{
+       prop := resp.GetProperty()
+       if prop == nil {
+               return
+       }
+       parsed := ParseTags(prop.GetTags())
+       kindStr := parsed.Kind
+       if kindStr == "" {
+               kindStr = prop.GetMetadata().GetName()
+       }
+       kind, kindErr := KindFromString(kindStr)
+       if kindErr != nil {
+               r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in 
event")
+               return
+       }
+       r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind", 
kind).
+               Str("propID", prop.GetId()).Msg("watch: received event")
+       switch resp.GetEventType() {
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+               md, convErr := ToSchema(kind, prop)
+               if convErr != nil {
+                       r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert property")
+                       return
+               }
+               r.processInitialResourceFromProperty(kind, prop, 
md.Spec.(proto.Message))
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+               propID := prop.GetId()
+               if r.cache.Delete(propID, resp.GetDeleteTime()) {
+                       md, convErr := ToSchema(kind, prop)
+                       if convErr != nil {
+                               r.l.Warn().Err(convErr).Stringer("kind", 
kind).Msg("watch: failed to convert deleted property")
+                               return
+                       }
+                       r.notifyHandlers(kind, md, true)
+               }
+       case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE, 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+               // handled by processWatchSession, not expected here
        }
-       return resp.GetNames(), nil
 }
 
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
-       return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+       sessions := r.getActiveSessions()
+       if len(sessions) == 0 {
+               return
+       }
+       maxRev := r.cache.GetMaxRevision()
+       req := &syncRequest{
+               criteria: buildRevisionCriteria(maxRev),
+       }
+       r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync: 
requesting changes")
+       r.sendSyncRequest(ctx, sessions, req)
 }
 
-type kindGroupPair struct {
-       groupName string
-       kind      schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+       r.watchMu.Lock()
+       defer r.watchMu.Unlock()
+       sessions := make(map[string]*watchSession, len(r.watchSessions))
+       for name, s := range r.watchSessions {
+               sessions[name] = s
+       }
+       return sessions
 }
 
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc 
*schemaClient) error {
-       groups, listErr := r.listGroupFromClient(ctx, sc.management)
-       if listErr != nil {
-               return listErr
-       }
-       var pairs []kindGroupPair
-       for _, group := range groups {
-               r.processInitialResource(schema.KindGroup, group)
-               catalog := group.GetCatalog()
-               groupName := group.GetMetadata().GetName()
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindStream},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindMeasure},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTopNAggregation},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_TRACE:
-                       pairs = append(pairs,
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindTrace},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRule},
-                               kindGroupPair{groupName: groupName, kind: 
schema.KindIndexRuleBinding},
-                       )
-               case commonv1.Catalog_CATALOG_PROPERTY:
-                       pairs = append(pairs, kindGroupPair{groupName: 
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+       sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+       r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest: 
dispatching to watch sessions")
+       for _, s := range sessions {
+               select {
+               case s.syncReqCh <- req:
                default:
-                       r.l.Error().Stringer("catalog", catalog).Str("group", 
groupName).Msg("unknown catalog type during initialization")
                }
        }
-       var wg sync.WaitGroup
-       for _, pair := range pairs {
-               currentPair := pair
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       r.initResourcesFromClient(ctx, sc.management, 
currentPair.kind, currentPair.groupName)
-               }()
+       allDigests := make(map[string][]*digestEntry, len(sessions))
+       for name, s := range sessions {
+               select {
+               case digests := <-s.syncResCh:
+                       allDigests[name] = digests
+               case <-time.After(30 * time.Second):
+                       r.l.Warn().Str("node", name).Msg("sync timeout")
+               case <-ctx.Done():
+                       return nil
+               }

Review Comment:
   Added a skip log when the channel is full. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to