Copilot commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909877731
##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l *logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
Review Comment:
In WatchSchemas, when the client closes its send direction, stream.Recv()
will return io.EOF. The current goroutine forwards that as an error, causing
the RPC to return io.EOF instead of a clean nil termination. Handle io.EOF
explicitly (return nil) to avoid surfacing it as a stream error.
##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l *logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
+ if !ok {
+ return nil
+ }
+ if replayErr := s.replaySchemas(stream, req); replayErr
!= nil {
+ return replayErr
+ }
+ if doneErr :=
stream.Send(&schemav1.WatchSchemasResponse{
+ EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+ }); doneErr != nil {
+ return doneErr
+ }
+ case resp, ok := <-ch:
+ if !ok {
+ return nil
+ }
+ if sendErr := stream.Send(resp); sendErr != nil {
+ return sendErr
+ }
+ }
}
- req.Query.Groups = []string{schema.SchemaGroup}
- results, queryErr := s.server.db.Query(ctx, req.Query)
- if queryErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- s.l.Error().Err(queryErr).Msg("failed to aggregate schema
updates")
- return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+ req *schemav1.WatchSchemasRequest,
+) error {
+ metadataOnly := len(req.TagProjection) > 0
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schema.SchemaGroup},
+ Criteria: req.Criteria,
+ }
+ results, err := s.server.db.Query(stream.Context(), query)
+ if err != nil {
+ return err
}
- nameSet := make(map[string]struct{}, len(results))
for _, result := range results {
var p propertyv1.Property
if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- return nil, unmarshalErr
+ s.l.Warn().Err(unmarshalErr).Msg("replay: failed to
unmarshal property")
+ continue
+ }
Review Comment:
replaySchemas skips entries on protojson.Unmarshal failure and continues
sending the rest. This can leave clients with a permanently incomplete replay
(and no retry) while the server only logs a warning. Consider returning an
error (so the stream retries) and/or incrementing metrics consistently with
ListSchemas/DeleteSchema, instead of silently continuing.
##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling
watch session")
+ session.cancelFn()
+ delete(r.watchSessions, nodeName)
}
}
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector
*syncCollector) {
- sinceRevision := r.cache.GetMaxRevision()
- r.l.Debug().Int64("sinceRevision",
sinceRevision).Msg("collectIncrementalSync: querying updates")
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx,
sc.update, sinceRevision)
- if queryErr != nil {
- return queryErr
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+ backoff := watchInitialBackoff
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
}
- r.l.Debug().Str("node", nodeName).Int("updatedKinds",
len(updatedKindNames)).
- Strs("kinds",
updatedKindNames).Msg("collectIncrementalSync: got updates")
- for _, kindName := range updatedKindNames {
- kind, kindErr := KindFromString(kindName)
- if kindErr != nil {
- r.l.Warn().Str("kindName",
kindName).Msg("unknown kind from aggregate update")
- continue
- }
- query := buildSchemaQuery(kind, "", "", sinceRevision)
- schemas, schemasErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if schemasErr != nil {
- return fmt.Errorf("failed to query updated
schemas for kind %s: %w", kind, schemasErr)
+ err := r.processWatchSession(ctx, updateClient, session)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
}
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas",
len(schemas)).Msg("collectIncrementalSync: collected")
- collector.add(nodeName, kind, schemas)
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
}
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect
incremental sync from some nodes")
}
}
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
- for kind, propMap := range collector.entries {
- cachedEntries := r.cache.GetEntriesByKind(kind)
- seen := make(map[string]bool, len(propMap))
- for propID, s := range propMap {
- seen[propID] = true
- if s.deleteTime > 0 {
- if entry, exists := cachedEntries[propID];
exists {
- r.handleDeletion(kind, propID, entry,
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+ stream, err := updateClient.WatchSchemas(ctx)
+ if err != nil {
+ return err
+ }
+
+ maxRev := r.cache.GetMaxRevision()
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession:
sending initial replay request")
+ initReq := &schemav1.WatchSchemasRequest{
+ Criteria: buildRevisionCriteria(maxRev),
+ }
+ if sendErr := stream.Send(initReq); sendErr != nil {
+ return sendErr
+ }
+
+ recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+ recvErrCh := make(chan error, 1)
+ go func() {
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ recvErrCh <- recvErr
+ close(recvCh)
+ return
+ }
+ recvCh <- resp
+ }
+ }()
+
+ var inSync bool
+ var metadataOnly bool
+ var digests []*digestEntry
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-recvErrCh:
+ return err
+ case req := <-session.syncReqCh:
+ r.l.Debug().Bool("metadataOnly", len(req.tagProjection)
> 0).
+ Bool("hasCriteria", req.criteria !=
nil).Msg("processWatchSession: sending sync request")
+ if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+ Criteria: req.criteria,
+ TagProjection: req.tagProjection,
+ }); sendErr != nil {
+ return sendErr
+ }
+ inSync = true
+ metadataOnly = len(req.tagProjection) > 0
+ digests = nil
+ case resp, ok := <-recvCh:
+ if !ok {
+ return <-recvErrCh
+ }
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ r.l.Debug().Bool("inSync",
inSync).Msg("processWatchSession: received REPLAY_DONE")
+ if inSync {
+ r.l.Debug().Int("digestCount",
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+ session.syncResCh <- digests
+ digests = nil
+ inSync = false
+ metadataOnly = false
}
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Str("propID",
propID).Msg("failed to convert property for reconcile")
- continue
+ if inSync && metadataOnly {
+ digests = append(digests, r.parseDigest(resp))
+ } else {
+ r.handleWatchEvent(resp)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
}
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
- r.mux.RLock()
- handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
- for kind := range r.handlers {
- handlerKindSet[kind] = struct{}{}
- }
- r.mux.RUnlock()
- for _, kind := range r.cache.GetCachedKinds() {
- handlerKindSet[kind] = struct{}{}
- }
- kinds := make([]schema.Kind, 0, len(handlerKindSet))
- for kind := range handlerKindSet {
- kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse)
*digestEntry {
+ prop := resp.GetProperty()
+ parsed := ParseTags(prop.GetTags())
+ return &digestEntry{
+ propID: prop.GetId(),
+ kind: parsed.Kind,
+ group: parsed.Group,
+ name: parsed.Name,
+ revision: parsed.UpdatedAt,
+ deleteTime: resp.GetDeleteTime(),
}
- return kinds
}
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
- query := buildUpdatedSchemasQuery(sinceRevision)
- resp, rpcErr := client.AggregateSchemaUpdates(ctx,
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
- if rpcErr != nil {
- return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse)
{
+ prop := resp.GetProperty()
+ if prop == nil {
+ return
+ }
+ parsed := ParseTags(prop.GetTags())
+ kindStr := parsed.Kind
+ if kindStr == "" {
+ kindStr = prop.GetMetadata().GetName()
+ }
+ kind, kindErr := KindFromString(kindStr)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in
event")
+ return
+ }
+ r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind",
kind).
+ Str("propID", prop.GetId()).Msg("watch: received event")
+ switch resp.GetEventType() {
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert property")
+ return
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+ propID := prop.GetId()
+ if r.cache.Delete(propID, resp.GetDeleteTime()) {
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert deleted property")
+ return
+ }
+ r.notifyHandlers(kind, md, true)
+ }
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+ // handled by processWatchSession, not expected here
}
- return resp.GetNames(), nil
}
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
- return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ return
+ }
+ maxRev := r.cache.GetMaxRevision()
+ req := &syncRequest{
+ criteria: buildRevisionCriteria(maxRev),
+ }
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync:
requesting changes")
+ r.sendSyncRequest(ctx, sessions, req)
}
-type kindGroupPair struct {
- groupName string
- kind schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ sessions := make(map[string]*watchSession, len(r.watchSessions))
+ for name, s := range r.watchSessions {
+ sessions[name] = s
+ }
+ return sessions
}
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc
*schemaClient) error {
- groups, listErr := r.listGroupFromClient(ctx, sc.management)
- if listErr != nil {
- return listErr
- }
- var pairs []kindGroupPair
- for _, group := range groups {
- r.processInitialResource(schema.KindGroup, group)
- catalog := group.GetCatalog()
- groupName := group.GetMetadata().GetName()
- switch catalog {
- case commonv1.Catalog_CATALOG_STREAM:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindStream},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_MEASURE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindMeasure},
- kindGroupPair{groupName: groupName, kind:
schema.KindTopNAggregation},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_TRACE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindTrace},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_PROPERTY:
- pairs = append(pairs, kindGroupPair{groupName:
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+ sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+ r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest:
dispatching to watch sessions")
+ for _, s := range sessions {
+ select {
+ case s.syncReqCh <- req:
default:
- r.l.Error().Stringer("catalog", catalog).Str("group",
groupName).Msg("unknown catalog type during initialization")
}
}
- var wg sync.WaitGroup
- for _, pair := range pairs {
- currentPair := pair
- wg.Add(1)
- go func() {
- defer wg.Done()
- r.initResourcesFromClient(ctx, sc.management,
currentPair.kind, currentPair.groupName)
- }()
+ allDigests := make(map[string][]*digestEntry, len(sessions))
+ for name, s := range sessions {
+ select {
+ case digests := <-s.syncResCh:
+ allDigests[name] = digests
+ case <-time.After(30 * time.Second):
+ r.l.Warn().Str("node", name).Msg("sync timeout")
+ case <-ctx.Done():
+ return nil
+ }
Review Comment:
sendSyncRequest drops a sync request when syncReqCh is full (default
branch), but still waits for a response on syncResCh. If the send is dropped,
this call will block until the 30s timeout for that session, stalling the sync
loop and delaying reconciliation. Consider guaranteeing delivery (block until
sent / overwrite previous pending request) or tracking which sessions actually
received the request and only waiting for those.
##########
banyand/metadata/schema/schemaserver/watcher.go:
##########
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "sync"
+
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const watcherChannelSize = 512
+
+type watcherManager struct {
+ l *logger.Logger
+ watchers map[uint64]chan *schemav1.WatchSchemasResponse
+ mu sync.RWMutex
+ nextID uint64
+}
+
+func newWatcherManager(l *logger.Logger) *watcherManager {
+ return &watcherManager{
+ l: l,
+ watchers: make(map[uint64]chan *schemav1.WatchSchemasResponse),
+ }
+}
+
+// Subscribe registers a new watcher and returns its ID and event channel.
+func (wm *watcherManager) Subscribe() (uint64, <-chan
*schemav1.WatchSchemasResponse) {
+ wm.mu.Lock()
+ defer wm.mu.Unlock()
+ wm.nextID++
+ id := wm.nextID
+ ch := make(chan *schemav1.WatchSchemasResponse, watcherChannelSize)
+ wm.watchers[id] = ch
+ wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers",
len(wm.watchers)).Msg("watcher subscribed")
+ return id, ch
+}
+
+// Unsubscribe removes a watcher and closes its channel.
+func (wm *watcherManager) Unsubscribe(id uint64) {
+ wm.mu.Lock()
+ defer wm.mu.Unlock()
+ if ch, ok := wm.watchers[id]; ok {
+ delete(wm.watchers, id)
+ close(ch)
+ wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers",
len(wm.watchers)).Msg("watcher unsubscribed")
+ }
+}
+
+// Broadcast sends an event to all watchers without blocking.
+func (wm *watcherManager) Broadcast(resp *schemav1.WatchSchemasResponse) {
+ wm.mu.RLock()
+ defer wm.mu.RUnlock()
+ wm.l.Debug().Stringer("eventType", resp.GetEventType()).
+ Str("propertyID", resp.GetProperty().GetId()).
+ Int("watcherCount", len(wm.watchers)).
+ Msg("broadcasting schema event")
+ for id, ch := range wm.watchers {
+ select {
+ case ch <- resp:
+ default:
+ wm.l.Warn().Uint64("watcherID", id).
+ Stringer("eventType", resp.GetEventType()).
+ Str("propertyID", resp.GetProperty().GetId()).
+ Msg("watcher channel full, dropping event")
+ }
Review Comment:
watcherManager.Broadcast logs resp.GetProperty().GetId() without checking if
Property is nil. Broadcast can be invoked with responses like REPLAY_DONE or an
empty response (see watcher_test), which will panic on a nil Property. Guard
against nil resp/property in the log fields (e.g., compute propertyID only when
resp.GetProperty()!=nil) before broadcasting.
##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling
watch session")
+ session.cancelFn()
+ delete(r.watchSessions, nodeName)
}
}
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector
*syncCollector) {
- sinceRevision := r.cache.GetMaxRevision()
- r.l.Debug().Int64("sinceRevision",
sinceRevision).Msg("collectIncrementalSync: querying updates")
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx,
sc.update, sinceRevision)
- if queryErr != nil {
- return queryErr
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+ backoff := watchInitialBackoff
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
}
- r.l.Debug().Str("node", nodeName).Int("updatedKinds",
len(updatedKindNames)).
- Strs("kinds",
updatedKindNames).Msg("collectIncrementalSync: got updates")
- for _, kindName := range updatedKindNames {
- kind, kindErr := KindFromString(kindName)
- if kindErr != nil {
- r.l.Warn().Str("kindName",
kindName).Msg("unknown kind from aggregate update")
- continue
- }
- query := buildSchemaQuery(kind, "", "", sinceRevision)
- schemas, schemasErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if schemasErr != nil {
- return fmt.Errorf("failed to query updated
schemas for kind %s: %w", kind, schemasErr)
+ err := r.processWatchSession(ctx, updateClient, session)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
}
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas",
len(schemas)).Msg("collectIncrementalSync: collected")
- collector.add(nodeName, kind, schemas)
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
}
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect
incremental sync from some nodes")
}
}
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
- for kind, propMap := range collector.entries {
- cachedEntries := r.cache.GetEntriesByKind(kind)
- seen := make(map[string]bool, len(propMap))
- for propID, s := range propMap {
- seen[propID] = true
- if s.deleteTime > 0 {
- if entry, exists := cachedEntries[propID];
exists {
- r.handleDeletion(kind, propID, entry,
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+ stream, err := updateClient.WatchSchemas(ctx)
+ if err != nil {
+ return err
+ }
+
+ maxRev := r.cache.GetMaxRevision()
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession:
sending initial replay request")
+ initReq := &schemav1.WatchSchemasRequest{
+ Criteria: buildRevisionCriteria(maxRev),
+ }
+ if sendErr := stream.Send(initReq); sendErr != nil {
+ return sendErr
+ }
+
+ recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+ recvErrCh := make(chan error, 1)
+ go func() {
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ recvErrCh <- recvErr
+ close(recvCh)
+ return
+ }
+ recvCh <- resp
+ }
+ }()
+
+ var inSync bool
+ var metadataOnly bool
+ var digests []*digestEntry
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-recvErrCh:
+ return err
+ case req := <-session.syncReqCh:
+ r.l.Debug().Bool("metadataOnly", len(req.tagProjection)
> 0).
+ Bool("hasCriteria", req.criteria !=
nil).Msg("processWatchSession: sending sync request")
+ if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+ Criteria: req.criteria,
+ TagProjection: req.tagProjection,
+ }); sendErr != nil {
+ return sendErr
+ }
+ inSync = true
+ metadataOnly = len(req.tagProjection) > 0
+ digests = nil
+ case resp, ok := <-recvCh:
+ if !ok {
+ return <-recvErrCh
+ }
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ r.l.Debug().Bool("inSync",
inSync).Msg("processWatchSession: received REPLAY_DONE")
+ if inSync {
+ r.l.Debug().Int("digestCount",
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+ session.syncResCh <- digests
+ digests = nil
+ inSync = false
+ metadataOnly = false
}
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Str("propID",
propID).Msg("failed to convert property for reconcile")
- continue
+ if inSync && metadataOnly {
+ digests = append(digests, r.parseDigest(resp))
+ } else {
+ r.handleWatchEvent(resp)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
}
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
- r.mux.RLock()
- handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
- for kind := range r.handlers {
- handlerKindSet[kind] = struct{}{}
- }
- r.mux.RUnlock()
- for _, kind := range r.cache.GetCachedKinds() {
- handlerKindSet[kind] = struct{}{}
- }
- kinds := make([]schema.Kind, 0, len(handlerKindSet))
- for kind := range handlerKindSet {
- kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse)
*digestEntry {
+ prop := resp.GetProperty()
+ parsed := ParseTags(prop.GetTags())
+ return &digestEntry{
+ propID: prop.GetId(),
+ kind: parsed.Kind,
+ group: parsed.Group,
+ name: parsed.Name,
+ revision: parsed.UpdatedAt,
+ deleteTime: resp.GetDeleteTime(),
}
- return kinds
}
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
- query := buildUpdatedSchemasQuery(sinceRevision)
- resp, rpcErr := client.AggregateSchemaUpdates(ctx,
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
- if rpcErr != nil {
- return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse)
{
+ prop := resp.GetProperty()
+ if prop == nil {
+ return
+ }
+ parsed := ParseTags(prop.GetTags())
+ kindStr := parsed.Kind
+ if kindStr == "" {
+ kindStr = prop.GetMetadata().GetName()
+ }
+ kind, kindErr := KindFromString(kindStr)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in
event")
+ return
+ }
+ r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind",
kind).
+ Str("propID", prop.GetId()).Msg("watch: received event")
+ switch resp.GetEventType() {
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert property")
+ return
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+ propID := prop.GetId()
+ if r.cache.Delete(propID, resp.GetDeleteTime()) {
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert deleted property")
+ return
+ }
+ r.notifyHandlers(kind, md, true)
+ }
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+ // handled by processWatchSession, not expected here
}
- return resp.GetNames(), nil
}
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
- return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ return
+ }
+ maxRev := r.cache.GetMaxRevision()
+ req := &syncRequest{
+ criteria: buildRevisionCriteria(maxRev),
+ }
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync:
requesting changes")
+ r.sendSyncRequest(ctx, sessions, req)
}
-type kindGroupPair struct {
- groupName string
- kind schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ sessions := make(map[string]*watchSession, len(r.watchSessions))
+ for name, s := range r.watchSessions {
+ sessions[name] = s
+ }
+ return sessions
}
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc
*schemaClient) error {
- groups, listErr := r.listGroupFromClient(ctx, sc.management)
- if listErr != nil {
- return listErr
- }
- var pairs []kindGroupPair
- for _, group := range groups {
- r.processInitialResource(schema.KindGroup, group)
- catalog := group.GetCatalog()
- groupName := group.GetMetadata().GetName()
- switch catalog {
- case commonv1.Catalog_CATALOG_STREAM:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindStream},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_MEASURE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindMeasure},
- kindGroupPair{groupName: groupName, kind:
schema.KindTopNAggregation},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_TRACE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindTrace},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_PROPERTY:
- pairs = append(pairs, kindGroupPair{groupName:
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+ sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+ r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest:
dispatching to watch sessions")
+ for _, s := range sessions {
+ select {
+ case s.syncReqCh <- req:
default:
- r.l.Error().Stringer("catalog", catalog).Str("group",
groupName).Msg("unknown catalog type during initialization")
}
}
- var wg sync.WaitGroup
- for _, pair := range pairs {
- currentPair := pair
- wg.Add(1)
- go func() {
- defer wg.Done()
- r.initResourcesFromClient(ctx, sc.management,
currentPair.kind, currentPair.groupName)
- }()
+ allDigests := make(map[string][]*digestEntry, len(sessions))
+ for name, s := range sessions {
+ select {
+ case digests := <-s.syncResCh:
+ allDigests[name] = digests
+ case <-time.After(30 * time.Second):
+ r.l.Warn().Str("node", name).Msg("sync timeout")
+ case <-ctx.Done():
+ return nil
+ }
}
- wg.Wait()
- return nil
+ return allDigests
}
-func (r *SchemaRegistry) listGroupFromClient(ctx context.Context, client
schemav1.SchemaManagementServiceClient) ([]*commonv1.Group, error) {
- query := buildSchemaQuery(schema.KindGroup, "", "", 0)
- results, queryErr := r.querySchemasFromClient(ctx, client, query)
- if queryErr != nil {
- return nil, queryErr
+func (r *SchemaRegistry) performFullSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ r.l.Debug().Msg("performFullSync: no active watch sessions,
skipping")
+ return
}
- var groups []*commonv1.Group
- for _, s := range results {
- if s.deleteTime > 0 {
- continue
- }
- md, convErr := ToSchema(schema.KindGroup, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Msg("failed to convert group
property")
- continue
- }
- g, ok := md.Spec.(*commonv1.Group)
- if !ok {
- continue
- }
- groups = append(groups, g)
+ req := &syncRequest{
+ tagProjection: basicTagKeys,
}
- return groups, nil
-}
-
-func (r *SchemaRegistry) initResourcesFromClient(ctx context.Context,
- client schemav1.SchemaManagementServiceClient, kind schema.Kind,
groupName string,
-) {
- query := buildSchemaQuery(kind, groupName, "", 0)
- results, queryErr := r.querySchemasFromClient(ctx, client, query)
- if queryErr != nil {
- r.l.Warn().Err(queryErr).Stringer("kind", kind).Str("group",
groupName).Msg("failed to list resources for init")
+ allDigests := r.sendSyncRequest(ctx, sessions, req)
+ if allDigests == nil {
return
}
- for _, s := range results {
- if s.deleteTime > 0 {
+
+ merged := r.mergeDigests(allDigests)
+ cachedEntries := r.cache.GetAllEntries()
+ r.l.Debug().Int("mergedCount", len(merged)).Int("cachedCount",
len(cachedEntries)).
+ Msg("performFullSync: comparing server digests with local
cache")
+
+ for propID, d := range merged {
+ if d.deleteTime > 0 {
+ if entry, exists := cachedEntries[propID]; exists {
+ kind, kindErr := KindFromString(d.kind)
+ if kindErr != nil {
+ r.l.Warn().Str("kind",
d.kind).Msg("full sync: unknown kind")
+ continue
+ }
+ r.handleDeletion(kind, propID, entry,
d.revision)
+ }
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("failed to convert property for init")
- continue
+ localEntry := cachedEntries[propID]
+ if localEntry == nil || localEntry.latestUpdateAt < d.revision {
+ kind, kindErr := KindFromString(d.kind)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", d.kind).Msg("full sync:
unknown kind")
+ continue
+ }
+ r.l.Debug().Str("propID", propID).Stringer("kind",
kind).
+ Int64("serverRevision",
d.revision).Msg("performFullSync: fetching updated schema from server")
+ prop, err := r.getSchema(ctx, kind, d.group, d.name)
+ if err != nil {
+ r.l.Warn().Err(err).Str("propID",
propID).Msg("full sync: failed to get schema")
+ continue
+ }
+ if prop == nil {
+ continue
+ }
+ md, err := ToSchema(kind, prop)
+ if err != nil {
+ r.l.Warn().Err(err).Str("propID",
propID).Msg("full sync: failed to convert property")
+ continue
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ }
+ }
+
+ for propID, entry := range cachedEntries {
+ if _, exists := merged[propID]; !exists {
+ r.l.Debug().Str("propID", propID).Stringer("kind",
entry.kind).
+ Msg("performFullSync: cached entry not found on
server, deleting")
+ r.handleDeletion(entry.kind, propID, entry,
entry.latestUpdateAt)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
-func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec
proto.Message) {
- prop, convErr := SchemaToProperty(kind, spec)
- if convErr != nil {
- r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to
convert spec to property for initial resource")
- return
+func (r *SchemaRegistry) mergeDigests(allDigests map[string][]*digestEntry)
map[string]*digestEntry {
+ merged := make(map[string]*digestEntry)
+ for _, digests := range allDigests {
+ for _, d := range digests {
+ existing, exists := merged[d.propID]
+ if !exists || d.revision > existing.revision {
+ merged[d.propID] = d
+ }
+ }
Review Comment:
mergeDigests only prefers the highest revision; when revisions are equal
(common for deletes since updated_at doesn't change), the selected digest is
nondeterministic due to map iteration order and may flip between
deleted/non-deleted across runs. Add a deterministic tie-breaker (e.g., for
equal revision prefer non-deleted, or prefer the larger deleteTime) to avoid
flakiness and incorrect deletions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]