mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909954608
##########
banyand/metadata/schema/property/client.go:
##########
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling
watch session")
+ session.cancelFn()
+ delete(r.watchSessions, nodeName)
}
}
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector
*syncCollector) {
- sinceRevision := r.cache.GetMaxRevision()
- r.l.Debug().Int64("sinceRevision",
sinceRevision).Msg("collectIncrementalSync: querying updates")
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx,
sc.update, sinceRevision)
- if queryErr != nil {
- return queryErr
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+ backoff := watchInitialBackoff
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
}
- r.l.Debug().Str("node", nodeName).Int("updatedKinds",
len(updatedKindNames)).
- Strs("kinds",
updatedKindNames).Msg("collectIncrementalSync: got updates")
- for _, kindName := range updatedKindNames {
- kind, kindErr := KindFromString(kindName)
- if kindErr != nil {
- r.l.Warn().Str("kindName",
kindName).Msg("unknown kind from aggregate update")
- continue
- }
- query := buildSchemaQuery(kind, "", "", sinceRevision)
- schemas, schemasErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if schemasErr != nil {
- return fmt.Errorf("failed to query updated
schemas for kind %s: %w", kind, schemasErr)
+ err := r.processWatchSession(ctx, updateClient, session)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
}
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas",
len(schemas)).Msg("collectIncrementalSync: collected")
- collector.add(nodeName, kind, schemas)
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
}
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect
incremental sync from some nodes")
}
}
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
- for kind, propMap := range collector.entries {
- cachedEntries := r.cache.GetEntriesByKind(kind)
- seen := make(map[string]bool, len(propMap))
- for propID, s := range propMap {
- seen[propID] = true
- if s.deleteTime > 0 {
- if entry, exists := cachedEntries[propID];
exists {
- r.handleDeletion(kind, propID, entry,
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+ stream, err := updateClient.WatchSchemas(ctx)
+ if err != nil {
+ return err
+ }
+
+ maxRev := r.cache.GetMaxRevision()
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession:
sending initial replay request")
+ initReq := &schemav1.WatchSchemasRequest{
+ Criteria: buildRevisionCriteria(maxRev),
+ }
+ if sendErr := stream.Send(initReq); sendErr != nil {
+ return sendErr
+ }
+
+ recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+ recvErrCh := make(chan error, 1)
+ go func() {
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ recvErrCh <- recvErr
+ close(recvCh)
+ return
+ }
+ recvCh <- resp
+ }
+ }()
+
+ var inSync bool
+ var metadataOnly bool
+ var digests []*digestEntry
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-recvErrCh:
+ return err
+ case req := <-session.syncReqCh:
+ r.l.Debug().Bool("metadataOnly", len(req.tagProjection)
> 0).
+ Bool("hasCriteria", req.criteria !=
nil).Msg("processWatchSession: sending sync request")
+ if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+ Criteria: req.criteria,
+ TagProjection: req.tagProjection,
+ }); sendErr != nil {
+ return sendErr
+ }
+ inSync = true
+ metadataOnly = len(req.tagProjection) > 0
+ digests = nil
+ case resp, ok := <-recvCh:
+ if !ok {
+ return <-recvErrCh
+ }
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ r.l.Debug().Bool("inSync",
inSync).Msg("processWatchSession: received REPLAY_DONE")
+ if inSync {
+ r.l.Debug().Int("digestCount",
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+ session.syncResCh <- digests
+ digests = nil
+ inSync = false
+ metadataOnly = false
}
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Str("propID",
propID).Msg("failed to convert property for reconcile")
- continue
+ if inSync && metadataOnly {
+ digests = append(digests, r.parseDigest(resp))
+ } else {
+ r.handleWatchEvent(resp)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
}
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
- r.mux.RLock()
- handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
- for kind := range r.handlers {
- handlerKindSet[kind] = struct{}{}
- }
- r.mux.RUnlock()
- for _, kind := range r.cache.GetCachedKinds() {
- handlerKindSet[kind] = struct{}{}
- }
- kinds := make([]schema.Kind, 0, len(handlerKindSet))
- for kind := range handlerKindSet {
- kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse)
*digestEntry {
+ prop := resp.GetProperty()
+ parsed := ParseTags(prop.GetTags())
+ return &digestEntry{
+ propID: prop.GetId(),
+ kind: parsed.Kind,
+ group: parsed.Group,
+ name: parsed.Name,
+ revision: parsed.UpdatedAt,
+ deleteTime: resp.GetDeleteTime(),
}
- return kinds
}
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
- query := buildUpdatedSchemasQuery(sinceRevision)
- resp, rpcErr := client.AggregateSchemaUpdates(ctx,
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
- if rpcErr != nil {
- return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse)
{
+ prop := resp.GetProperty()
+ if prop == nil {
+ return
+ }
+ parsed := ParseTags(prop.GetTags())
+ kindStr := parsed.Kind
+ if kindStr == "" {
+ kindStr = prop.GetMetadata().GetName()
+ }
+ kind, kindErr := KindFromString(kindStr)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in
event")
+ return
+ }
+ r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind",
kind).
+ Str("propID", prop.GetId()).Msg("watch: received event")
+ switch resp.GetEventType() {
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert property")
+ return
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+ propID := prop.GetId()
+ if r.cache.Delete(propID, resp.GetDeleteTime()) {
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert deleted property")
+ return
+ }
+ r.notifyHandlers(kind, md, true)
+ }
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+ // handled by processWatchSession, not expected here
}
- return resp.GetNames(), nil
}
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
- return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ return
+ }
+ maxRev := r.cache.GetMaxRevision()
+ req := &syncRequest{
+ criteria: buildRevisionCriteria(maxRev),
+ }
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync:
requesting changes")
+ r.sendSyncRequest(ctx, sessions, req)
}
-type kindGroupPair struct {
- groupName string
- kind schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ sessions := make(map[string]*watchSession, len(r.watchSessions))
+ for name, s := range r.watchSessions {
+ sessions[name] = s
+ }
+ return sessions
}
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc
*schemaClient) error {
- groups, listErr := r.listGroupFromClient(ctx, sc.management)
- if listErr != nil {
- return listErr
- }
- var pairs []kindGroupPair
- for _, group := range groups {
- r.processInitialResource(schema.KindGroup, group)
- catalog := group.GetCatalog()
- groupName := group.GetMetadata().GetName()
- switch catalog {
- case commonv1.Catalog_CATALOG_STREAM:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindStream},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_MEASURE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindMeasure},
- kindGroupPair{groupName: groupName, kind:
schema.KindTopNAggregation},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_TRACE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindTrace},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_PROPERTY:
- pairs = append(pairs, kindGroupPair{groupName:
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+ sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+ r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest:
dispatching to watch sessions")
+ for _, s := range sessions {
+ select {
+ case s.syncReqCh <- req:
default:
- r.l.Error().Stringer("catalog", catalog).Str("group",
groupName).Msg("unknown catalog type during initialization")
}
}
- var wg sync.WaitGroup
- for _, pair := range pairs {
- currentPair := pair
- wg.Add(1)
- go func() {
- defer wg.Done()
- r.initResourcesFromClient(ctx, sc.management,
currentPair.kind, currentPair.groupName)
- }()
+ allDigests := make(map[string][]*digestEntry, len(sessions))
+ for name, s := range sessions {
+ select {
+ case digests := <-s.syncResCh:
+ allDigests[name] = digests
+ case <-time.After(30 * time.Second):
+ r.l.Warn().Str("node", name).Msg("sync timeout")
+ case <-ctx.Done():
+ return nil
+ }
Review Comment:
Added a skip log when the channel is full.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]