This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e1ba421bd Support client watch the schema server updates (#995)
e1ba421bd is described below
commit e1ba421bd624727760c7a69c84c6fe55878fb526
Author: mrproliu <[email protected]>
AuthorDate: Tue Mar 10 16:18:16 2026 +0800
Support client watch the schema server updates (#995)
* Support client watch the schema server updates
---
api/proto/banyandb/schema/v1/internal.proto | 26 +-
banyand/metadata/schema/property/client.go | 657 +++++++++++----------
banyand/metadata/schema/property/client_test.go | 511 +++++++++++-----
banyand/metadata/schema/property/converter.go | 67 ++-
banyand/metadata/schema/property/converter_test.go | 35 +-
banyand/metadata/schema/property/schema_client.go | 9 +-
banyand/metadata/schema/schemaserver/grpc.go | 164 ++++-
banyand/metadata/schema/schemaserver/grpc_test.go | 169 +++++-
banyand/metadata/schema/schemaserver/service.go | 27 +-
banyand/metadata/schema/schemaserver/watcher.go | 105 ++++
.../metadata/schema/schemaserver/watcher_test.go | 115 ++++
docs/api-reference.md | 87 +--
pkg/test/setup/setup.go | 6 +-
.../cases/cluster/docker-compose-property.yml | 4 +-
.../event/banyandb/docker-compose-property.yml | 2 +-
.../trace/banyandb/docker-compose-property.yml | 2 +-
.../storage/banyandb/docker-compose-property.yml | 2 +-
.../distributed/lifecycle/property/suite_test.go | 1 -
18 files changed, 1389 insertions(+), 600 deletions(-)
diff --git a/api/proto/banyandb/schema/v1/internal.proto
b/api/proto/banyandb/schema/v1/internal.proto
index f753411ff..9d5908d1c 100644
--- a/api/proto/banyandb/schema/v1/internal.proto
+++ b/api/proto/banyandb/schema/v1/internal.proto
@@ -19,6 +19,7 @@ syntax = "proto3";
package banyandb.schema.v1;
+import "banyandb/model/v1/query.proto";
import "banyandb/property/v1/property.proto";
import "banyandb/property/v1/rpc.proto";
import "google/protobuf/timestamp.proto";
@@ -74,15 +75,26 @@ service SchemaManagementService {
rpc RepairSchema(RepairSchemaRequest) returns (RepairSchemaResponse);
}
-message AggregateSchemaUpdatesRequest {
- property.v1.QueryRequest query = 1;
+service SchemaUpdateService {
+ rpc WatchSchemas(stream WatchSchemasRequest) returns (stream
WatchSchemasResponse);
}
-message AggregateSchemaUpdatesResponse {
- // which names/schemas has updates
- repeated string names = 1;
+enum SchemaEventType {
+ SCHEMA_EVENT_TYPE_UNSPECIFIED = 0;
+ SCHEMA_EVENT_TYPE_INSERT = 1;
+ SCHEMA_EVENT_TYPE_UPDATE = 2;
+ SCHEMA_EVENT_TYPE_DELETE = 3;
+ SCHEMA_EVENT_TYPE_REPLAY_DONE = 4;
}
-service SchemaUpdateService {
- rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns
(AggregateSchemaUpdatesResponse);
+message WatchSchemasRequest {
+ model.v1.Criteria criteria = 1;
+ repeated string tag_projection = 2;
+}
+
+message WatchSchemasResponse {
+ SchemaEventType event_type = 1;
+ property.v1.Property property = 2;
+ bool metadata_only = 3;
+ int64 delete_time = 4;
}
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index 733715de9..19b6d42fe 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -34,6 +34,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
"github.com/apache/skywalking-banyandb/api/validate"
@@ -73,6 +74,29 @@ type ClientConfig struct {
TLSEnabled bool
}
+type syncRequest struct {
+ criteria *modelv1.Criteria
+ tagProjection []string
+}
+
+type watchSession struct {
+ cancelFn context.CancelFunc
+ syncReqCh chan *syncRequest
+ syncResCh chan []*digestEntry
+}
+
+type digestEntry struct {
+ propID string
+ kind string
+ group string
+ name string
+ revision int64
+ deleteTime int64
+}
+
+// DefaultFullReconcileEvery is the default number of sync rounds between full
reconciliations.
+const DefaultFullReconcileEvery = 5
+
// SchemaRegistry implements schema.Registry using property-based schema
servers.
type SchemaRegistry struct {
connMgr *grpchelper.ConnManager[*schemaClient]
@@ -81,11 +105,12 @@ type SchemaRegistry struct {
cache *schemaCache
caCertReloader *pkgtls.Reloader
handlers map[schema.Kind][]schema.EventHandler
+ watchSessions map[string]*watchSession
syncInterval time.Duration
fullReconcileEvery uint64
syncRound uint64
- started atomic.Bool
mux sync.RWMutex
+ watchMu sync.Mutex
}
// NewSchemaRegistryClient creates a new property-based schema registry client.
@@ -118,8 +143,8 @@ func NewSchemaRegistryClient(cfg *ClientConfig)
(*SchemaRegistry, error) {
initWaitTime = DefaultInitWaitTime
}
fullReconcileEvery := cfg.FullReconcileEvery
- if fullReconcileEvery == 0 {
- fullReconcileEvery = 5
+ if fullReconcileEvery <= 0 {
+ fullReconcileEvery = DefaultFullReconcileEvery
}
reg := &SchemaRegistry{
connMgr: connMgr,
@@ -128,9 +153,11 @@ func NewSchemaRegistryClient(cfg *ClientConfig)
(*SchemaRegistry, error) {
cache: newSchemaCache(),
caCertReloader: caCertReloader,
handlers: make(map[schema.Kind][]schema.EventHandler),
+ watchSessions: make(map[string]*watchSession),
syncInterval: syncInterval,
fullReconcileEvery: fullReconcileEvery,
}
+ handler.registry = reg
if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
connMgr.OnAddOrUpdate(cfg.CurNode)
@@ -258,64 +285,11 @@ func (r *SchemaRegistry) broadcastInsert(ctx
context.Context, prop *propertyv1.P
return nil
}
-func (r *SchemaRegistry) forEachActiveNode(fn func(nodeName string, c
*schemaClient) error, errMsg string) {
- names := r.connMgr.ActiveNames()
- for _, nodeName := range names {
- currentNode := nodeName
- execErr := r.connMgr.Execute(currentNode, func(c *schemaClient)
error {
- return fn(currentNode, c)
- })
- if execErr != nil {
- r.l.Warn().Err(execErr).Str("node",
currentNode).Msg(errMsg)
- }
- }
-}
-
type schemaWithDeleteTime struct {
property *propertyv1.Property
deleteTime int64
}
-type syncCollector struct {
- entries map[schema.Kind]map[string]*schemaWithDeleteTime
- queriedServers map[schema.Kind]map[string]bool
- mu sync.Mutex
-}
-
-func newSyncCollector() *syncCollector {
- return &syncCollector{
- entries:
make(map[schema.Kind]map[string]*schemaWithDeleteTime),
- queriedServers: make(map[schema.Kind]map[string]bool),
- }
-}
-
-func (c *syncCollector) add(serverName string, kind schema.Kind, schemas
[]*schemaWithDeleteTime) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.entries[kind] == nil {
- c.entries[kind] = make(map[string]*schemaWithDeleteTime)
- }
- if c.queriedServers[kind] == nil {
- c.queriedServers[kind] = make(map[string]bool)
- }
- c.queriedServers[kind][serverName] = true
- for _, s := range schemas {
- propID := s.property.GetId()
- existing, exists := c.entries[kind][propID]
- if !exists {
- c.entries[kind][propID] = s
- continue
- }
- existingRev := ParseTags(existing.property.GetTags()).UpdatedAt
- newRev := ParseTags(s.property.GetTags()).UpdatedAt
- // Prefer higher revision. If same revision, prefer non-deleted
over deleted.
- if newRev > existingRev ||
- newRev == existingRev && existing.deleteTime > 0 &&
s.deleteTime == 0 {
- c.entries[kind][propID] = s
- }
- }
-}
-
type propInfo struct {
nodeRev map[string]int64
nodeDelTime map[string]int64
@@ -443,23 +417,9 @@ func (r *SchemaRegistry) getSchema(ctx context.Context,
kind schema.Kind,
}
propID := query.Ids[0]
info := propMap[propID]
- if !r.started.Load() {
- if info == nil || info.best == nil || info.best.deleteTime > 0 {
- return nil, nil
- }
- return info.best.property, nil
- }
if info == nil || info.best == nil || info.best.deleteTime > 0 {
- entry := r.cache.Get(propID)
- if entry != nil {
- r.handleDeletion(kind, propID, entry,
entry.latestUpdateAt)
- }
return nil, nil
}
- md, convErr := ToSchema(kind, info.best.property)
- if convErr == nil {
- r.processInitialResourceFromProperty(kind, info.best.property,
md.Spec.(proto.Message))
- }
return info.best.property, nil
}
@@ -471,39 +431,10 @@ func (r *SchemaRegistry) listSchemas(ctx context.Context,
kind schema.Kind,
if queryErr != nil {
return nil, queryErr
}
- if !r.started.Load() {
- result := make([]*propertyv1.Property, 0, len(propMap))
- for _, info := range propMap {
- if info.best != nil && info.best.deleteTime == 0 {
- result = append(result, info.best.property)
- }
- }
- return result, nil
- }
- serverPropIDs := make(map[string]struct{})
result := make([]*propertyv1.Property, 0, len(propMap))
- for propID, info := range propMap {
- if info.best == nil || info.best.deleteTime != 0 {
- entry := r.cache.Get(propID)
- if entry != nil {
- r.handleDeletion(kind, propID, entry,
entry.latestUpdateAt)
- }
- continue
- }
- serverPropIDs[propID] = struct{}{}
- md, convErr := ToSchema(kind, info.best.property)
- if convErr == nil {
- r.processInitialResourceFromProperty(kind,
info.best.property, md.Spec.(proto.Message))
- }
- result = append(result, info.best.property)
- }
- cachedEntries := r.cache.GetEntriesByKind(kind)
- for propID, entry := range cachedEntries {
- if group != "" && entry.group != group {
- continue
- }
- if _, found := serverPropIDs[propID]; !found {
- r.handleDeletion(kind, propID, entry,
entry.latestUpdateAt)
+ for _, info := range propMap {
+ if info.best != nil && info.best.deleteTime == 0 {
+ result = append(result, info.best.property)
}
}
return result, nil
@@ -1041,10 +972,10 @@ func (r *SchemaRegistry) Start(ctx context.Context)
error {
}
}
}
- r.forEachActiveNode(func(_ string, sc *schemaClient) error {
- return r.initializeFromSchemaClient(ctx, sc)
- }, "failed to initialize from schema client at startup")
- r.started.Store(true)
+ // Replay all cached entries to handlers. Watch replay (launched during
+ // OnActive in PreRun) may have populated the cache before handlers were
+ // registered. This explicit replay ensures handlers see every entry.
+ r.notifyHandlersFromCache()
go r.syncLoop(ctx)
return nil
}
@@ -1063,260 +994,349 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("stopWatch: canceling
watch session")
+ session.cancelFn()
+ delete(r.watchSessions, nodeName)
}
}
-func (r *SchemaRegistry) collectIncrementalSync(ctx context.Context, collector
*syncCollector) {
- sinceRevision := r.cache.GetMaxRevision()
- r.l.Debug().Int64("sinceRevision",
sinceRevision).Msg("collectIncrementalSync: querying updates")
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- updatedKindNames, queryErr := r.queryUpdatedSchemas(ctx,
sc.update, sinceRevision)
- if queryErr != nil {
- return queryErr
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(ctx context.Context, nodeName string,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) {
+ backoff := watchInitialBackoff
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
}
- r.l.Debug().Str("node", nodeName).Int("updatedKinds",
len(updatedKindNames)).
- Strs("kinds",
updatedKindNames).Msg("collectIncrementalSync: got updates")
- for _, kindName := range updatedKindNames {
- kind, kindErr := KindFromString(kindName)
- if kindErr != nil {
- r.l.Warn().Str("kindName",
kindName).Msg("unknown kind from aggregate update")
- continue
- }
- query := buildSchemaQuery(kind, "", "", sinceRevision)
- schemas, schemasErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if schemasErr != nil {
- return fmt.Errorf("failed to query updated
schemas for kind %s: %w", kind, schemasErr)
+ err := r.processWatchSession(ctx, updateClient, session)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
}
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas",
len(schemas)).Msg("collectIncrementalSync: collected")
- collector.add(nodeName, kind, schemas)
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
}
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect
incremental sync from some nodes")
}
}
-func (r *SchemaRegistry) reconcileFromCollector(collector *syncCollector) {
- for kind, propMap := range collector.entries {
- cachedEntries := r.cache.GetEntriesByKind(kind)
- seen := make(map[string]bool, len(propMap))
- for propID, s := range propMap {
- seen[propID] = true
- if s.deleteTime > 0 {
- if entry, exists := cachedEntries[propID];
exists {
- r.handleDeletion(kind, propID, entry,
s.deleteTime)
+func (r *SchemaRegistry) processWatchSession(ctx context.Context,
+ updateClient schemav1.SchemaUpdateServiceClient, session *watchSession,
+) error {
+ stream, err := updateClient.WatchSchemas(ctx)
+ if err != nil {
+ return err
+ }
+
+ maxRev := r.cache.GetMaxRevision()
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("processWatchSession:
sending initial replay request")
+ initReq := &schemav1.WatchSchemasRequest{
+ Criteria: buildRevisionCriteria(maxRev),
+ }
+ if sendErr := stream.Send(initReq); sendErr != nil {
+ return sendErr
+ }
+
+ recvCh := make(chan *schemav1.WatchSchemasResponse, 64)
+ recvErrCh := make(chan error, 1)
+ go func() {
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ recvErrCh <- recvErr
+ close(recvCh)
+ return
+ }
+ recvCh <- resp
+ }
+ }()
+
+ var inSync bool
+ var metadataOnly bool
+ var digests []*digestEntry
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-recvErrCh:
+ return err
+ case req := <-session.syncReqCh:
+ r.l.Debug().Bool("metadataOnly", len(req.tagProjection)
> 0).
+ Bool("hasCriteria", req.criteria !=
nil).Msg("processWatchSession: sending sync request")
+ if sendErr := stream.Send(&schemav1.WatchSchemasRequest{
+ Criteria: req.criteria,
+ TagProjection: req.tagProjection,
+ }); sendErr != nil {
+ return sendErr
+ }
+ inSync = true
+ metadataOnly = len(req.tagProjection) > 0
+ digests = nil
+ case resp, ok := <-recvCh:
+ if !ok {
+ return <-recvErrCh
+ }
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ r.l.Debug().Bool("inSync",
inSync).Msg("processWatchSession: received REPLAY_DONE")
+ if inSync {
+ r.l.Debug().Int("digestCount",
len(digests)).Msg("processWatchSession: sync replay completed, sending digests")
+ session.syncResCh <- digests
+ digests = nil
+ inSync = false
+ metadataOnly = false
}
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Str("propID",
propID).Msg("failed to convert property for reconcile")
- continue
+ if inSync && metadataOnly {
+ digests = append(digests, r.parseDigest(resp))
+ } else {
+ r.handleWatchEvent(resp)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
}
-func (r *SchemaRegistry) getKindsToSync() []schema.Kind {
- r.mux.RLock()
- handlerKindSet := make(map[schema.Kind]struct{}, len(r.handlers))
- for kind := range r.handlers {
- handlerKindSet[kind] = struct{}{}
- }
- r.mux.RUnlock()
- for _, kind := range r.cache.GetCachedKinds() {
- handlerKindSet[kind] = struct{}{}
- }
- kinds := make([]schema.Kind, 0, len(handlerKindSet))
- for kind := range handlerKindSet {
- kinds = append(kinds, kind)
+func (r *SchemaRegistry) parseDigest(resp *schemav1.WatchSchemasResponse)
*digestEntry {
+ prop := resp.GetProperty()
+ parsed := ParseTags(prop.GetTags())
+ return &digestEntry{
+ propID: prop.GetId(),
+ kind: parsed.Kind,
+ group: parsed.Group,
+ name: parsed.Name,
+ revision: parsed.UpdatedAt,
+ deleteTime: resp.GetDeleteTime(),
}
- return kinds
}
-func (r *SchemaRegistry) queryUpdatedSchemas(ctx context.Context, client
schemav1.SchemaUpdateServiceClient, sinceRevision int64) ([]string, error) {
- query := buildUpdatedSchemasQuery(sinceRevision)
- resp, rpcErr := client.AggregateSchemaUpdates(ctx,
&schemav1.AggregateSchemaUpdatesRequest{Query: query})
- if rpcErr != nil {
- return nil, rpcErr
+func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse)
{
+ prop := resp.GetProperty()
+ if prop == nil {
+ return
+ }
+ parsed := ParseTags(prop.GetTags())
+ kindStr := parsed.Kind
+ if kindStr == "" {
+ kindStr = prop.GetMetadata().GetName()
+ }
+ kind, kindErr := KindFromString(kindStr)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", kindStr).Msg("watch: unknown kind in
event")
+ return
+ }
+ r.l.Debug().Stringer("eventType", resp.GetEventType()).Stringer("kind",
kind).
+ Str("propID", prop.GetId()).Msg("watch: received event")
+ switch resp.GetEventType() {
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE:
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert property")
+ return
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
+ propID := prop.GetId()
+ if r.cache.Delete(propID, resp.GetDeleteTime()) {
+ md, convErr := ToSchema(kind, prop)
+ if convErr != nil {
+ r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert deleted property")
+ return
+ }
+ r.notifyHandlers(kind, md, true)
+ }
+ case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
+ // handled by processWatchSession, not expected here
}
- return resp.GetNames(), nil
}
-func (r *SchemaRegistry) shouldFullReconcile(round uint64) bool {
- return r.fullReconcileEvery > 0 && round%r.fullReconcileEvery == 0
+func (r *SchemaRegistry) performIncrementalSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ return
+ }
+ maxRev := r.cache.GetMaxRevision()
+ req := &syncRequest{
+ criteria: buildRevisionCriteria(maxRev),
+ }
+ r.l.Debug().Int64("maxRevision", maxRev).Msg("incremental sync:
requesting changes")
+ r.sendSyncRequest(ctx, sessions, req)
}
-type kindGroupPair struct {
- groupName string
- kind schema.Kind
+func (r *SchemaRegistry) getActiveSessions() map[string]*watchSession {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ sessions := make(map[string]*watchSession, len(r.watchSessions))
+ for name, s := range r.watchSessions {
+ sessions[name] = s
+ }
+ return sessions
}
-func (r *SchemaRegistry) initializeFromSchemaClient(ctx context.Context, sc
*schemaClient) error {
- groups, listErr := r.listGroupFromClient(ctx, sc.management)
- if listErr != nil {
- return listErr
- }
- var pairs []kindGroupPair
- for _, group := range groups {
- r.processInitialResource(schema.KindGroup, group)
- catalog := group.GetCatalog()
- groupName := group.GetMetadata().GetName()
- switch catalog {
- case commonv1.Catalog_CATALOG_STREAM:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindStream},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_MEASURE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindMeasure},
- kindGroupPair{groupName: groupName, kind:
schema.KindTopNAggregation},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_TRACE:
- pairs = append(pairs,
- kindGroupPair{groupName: groupName, kind:
schema.KindTrace},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRule},
- kindGroupPair{groupName: groupName, kind:
schema.KindIndexRuleBinding},
- )
- case commonv1.Catalog_CATALOG_PROPERTY:
- pairs = append(pairs, kindGroupPair{groupName:
groupName, kind: schema.KindProperty})
+func (r *SchemaRegistry) sendSyncRequest(ctx context.Context,
+ sessions map[string]*watchSession, req *syncRequest,
+) map[string][]*digestEntry {
+ r.l.Debug().Int("sessionCount", len(sessions)).Msg("sendSyncRequest:
dispatching to watch sessions")
+ sent := make(map[string]*watchSession, len(sessions))
+ for name, s := range sessions {
+ select {
+ case s.syncReqCh <- req:
+ sent[name] = s
default:
- r.l.Error().Stringer("catalog", catalog).Str("group",
groupName).Msg("unknown catalog type during initialization")
+ r.l.Warn().Str("node", name).Msg("sendSyncRequest:
channel full, skipping session")
}
}
- var wg sync.WaitGroup
- for _, pair := range pairs {
- currentPair := pair
- wg.Add(1)
- go func() {
- defer wg.Done()
- r.initResourcesFromClient(ctx, sc.management,
currentPair.kind, currentPair.groupName)
- }()
+ allDigests := make(map[string][]*digestEntry, len(sent))
+ for name, s := range sent {
+ select {
+ case digests := <-s.syncResCh:
+ allDigests[name] = digests
+ case <-time.After(30 * time.Second):
+ r.l.Warn().Str("node", name).Msg("sync timeout")
+ case <-ctx.Done():
+ return nil
+ }
}
- wg.Wait()
- return nil
+ return allDigests
}
-func (r *SchemaRegistry) listGroupFromClient(ctx context.Context, client
schemav1.SchemaManagementServiceClient) ([]*commonv1.Group, error) {
- query := buildSchemaQuery(schema.KindGroup, "", "", 0)
- results, queryErr := r.querySchemasFromClient(ctx, client, query)
- if queryErr != nil {
- return nil, queryErr
+func (r *SchemaRegistry) performFullSync(ctx context.Context) {
+ sessions := r.getActiveSessions()
+ if len(sessions) == 0 {
+ r.l.Debug().Msg("performFullSync: no active watch sessions,
skipping")
+ return
}
- var groups []*commonv1.Group
- for _, s := range results {
- if s.deleteTime > 0 {
- continue
- }
- md, convErr := ToSchema(schema.KindGroup, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Msg("failed to convert group
property")
- continue
- }
- g, ok := md.Spec.(*commonv1.Group)
- if !ok {
- continue
- }
- groups = append(groups, g)
+ req := &syncRequest{
+ tagProjection: basicTagKeys,
}
- return groups, nil
-}
-
-func (r *SchemaRegistry) initResourcesFromClient(ctx context.Context,
- client schemav1.SchemaManagementServiceClient, kind schema.Kind,
groupName string,
-) {
- query := buildSchemaQuery(kind, groupName, "", 0)
- results, queryErr := r.querySchemasFromClient(ctx, client, query)
- if queryErr != nil {
- r.l.Warn().Err(queryErr).Stringer("kind", kind).Str("group",
groupName).Msg("failed to list resources for init")
+ allDigests := r.sendSyncRequest(ctx, sessions, req)
+ if allDigests == nil {
return
}
- for _, s := range results {
- if s.deleteTime > 0 {
+
+ merged := r.mergeDigests(allDigests)
+ cachedEntries := r.cache.GetAllEntries()
+ r.l.Debug().Int("mergedCount", len(merged)).Int("cachedCount",
len(cachedEntries)).
+ Msg("performFullSync: comparing server digests with local
cache")
+
+ for propID, d := range merged {
+ if d.deleteTime > 0 {
+ if entry, exists := cachedEntries[propID]; exists {
+ kind, kindErr := KindFromString(d.kind)
+ if kindErr != nil {
+ r.l.Warn().Str("kind",
d.kind).Msg("full sync: unknown kind")
+ continue
+ }
+ r.handleDeletion(kind, propID, entry,
d.revision)
+ }
continue
}
- md, convErr := ToSchema(kind, s.property)
- if convErr != nil {
- r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("failed to convert property for init")
- continue
+ localEntry := cachedEntries[propID]
+ if localEntry == nil || localEntry.latestUpdateAt < d.revision {
+ kind, kindErr := KindFromString(d.kind)
+ if kindErr != nil {
+ r.l.Warn().Str("kind", d.kind).Msg("full sync:
unknown kind")
+ continue
+ }
+ r.l.Debug().Str("propID", propID).Stringer("kind",
kind).
+ Int64("serverRevision",
d.revision).Msg("performFullSync: fetching updated schema from server")
+ prop, err := r.getSchema(ctx, kind, d.group, d.name)
+ if err != nil {
+ r.l.Warn().Err(err).Str("propID",
propID).Msg("full sync: failed to get schema")
+ continue
+ }
+ if prop == nil {
+ continue
+ }
+ md, err := ToSchema(kind, prop)
+ if err != nil {
+ r.l.Warn().Err(err).Str("propID",
propID).Msg("full sync: failed to convert property")
+ continue
+ }
+ r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
+ }
+ }
+
+ for propID, entry := range cachedEntries {
+ if _, exists := merged[propID]; !exists {
+ r.l.Debug().Str("propID", propID).Stringer("kind",
entry.kind).
+ Msg("performFullSync: cached entry not found on
server, deleting")
+ r.handleDeletion(entry.kind, propID, entry,
entry.latestUpdateAt)
}
- r.processInitialResourceFromProperty(kind, s.property,
md.Spec.(proto.Message))
}
}
-func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec
proto.Message) {
- prop, convErr := SchemaToProperty(kind, spec)
- if convErr != nil {
- r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to
convert spec to property for initial resource")
- return
+func (r *SchemaRegistry) mergeDigests(allDigests map[string][]*digestEntry)
map[string]*digestEntry {
+ merged := make(map[string]*digestEntry)
+ for _, digests := range allDigests {
+ for _, d := range digests {
+ existing, exists := merged[d.propID]
+ if !exists || d.revision > existing.revision ||
+ (d.revision == existing.revision &&
d.deleteTime > existing.deleteTime) {
+ merged[d.propID] = d
+ }
+ }
}
- r.processInitialResourceFromProperty(kind, prop, spec)
+ return merged
}
func (r *SchemaRegistry) processInitialResourceFromProperty(kind schema.Kind,
prop *propertyv1.Property, spec proto.Message) {
@@ -1345,6 +1365,8 @@ func (r *SchemaRegistry)
processInitialResourceFromProperty(kind schema.Kind, pr
}
func (r *SchemaRegistry) handleDeletion(kind schema.Kind, propID string, entry
*cacheEntry, revision int64) {
+ r.l.Debug().Stringer("kind", kind).Str("propID",
propID).Int64("revision", revision).
+ Msg("handleDeletion: attempting to delete from cache")
if r.cache.Delete(propID, revision) {
r.notifyHandlers(kind, schema.Metadata{
TypeMeta: schema.TypeMeta{
@@ -1372,6 +1394,21 @@ func (r *SchemaRegistry) notifyHandlers(kind
schema.Kind, md schema.Metadata, is
}
}
+func (r *SchemaRegistry) notifyHandlersFromCache() {
+ entries := r.cache.GetAllEntries()
+ r.l.Debug().Int("entryCount",
len(entries)).Msg("notifyHandlersFromCache: replaying cached entries to
handlers")
+ for _, entry := range entries {
+ r.notifyHandlers(entry.kind, schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: entry.kind,
+ Name: entry.name,
+ Group: entry.group,
+ },
+ Spec: entry.spec,
+ }, false)
+ }
+}
+
// ActiveNodeNames returns the names of all active schema server nodes.
func (r *SchemaRegistry) ActiveNodeNames() []string {
return r.connMgr.ActiveNames()
@@ -1382,6 +1419,12 @@ func (r *SchemaRegistry) Close() error {
if r.caCertReloader != nil {
r.caCertReloader.Stop()
}
+ r.watchMu.Lock()
+ for nodeName, session := range r.watchSessions {
+ session.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ r.watchMu.Unlock()
r.closer.Done()
r.closer.CloseThenWait()
r.connMgr.GracefulStop()
diff --git a/banyand/metadata/schema/property/client_test.go
b/banyand/metadata/schema/property/client_test.go
index ce3f9616a..4986e9192 100644
--- a/banyand/metadata/schema/property/client_test.go
+++ b/banyand/metadata/schema/property/client_test.go
@@ -22,7 +22,6 @@ import (
"fmt"
"net"
"strconv"
- "strings"
"sync"
"testing"
"time"
@@ -37,6 +36,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -46,7 +46,10 @@ import (
testflags "github.com/apache/skywalking-banyandb/pkg/test/flags"
)
-const testStreamName = "test-stream"
+const (
+ testStreamName = "test-stream"
+ testDeleteStreamName = "delete-stream"
+)
func startTestSchemaServer(t *testing.T) string {
t.Helper()
@@ -55,6 +58,12 @@ func startTestSchemaServer(t *testing.T) string {
}
func startTestSchemaServerStoppable(t *testing.T) (string, func()) {
+ t.Helper()
+ _, addr, stopFn := startTestSchemaServerFull(t)
+ return addr, stopFn
+}
+
+func startTestSchemaServerFull(t *testing.T) (schemaserver.Server, string,
func()) {
t.Helper()
srv := schemaserver.NewServer(observability.BypassRegistry)
flagSet := srv.FlagSet()
@@ -67,7 +76,9 @@ func startTestSchemaServerStoppable(t *testing.T) (string,
func()) {
require.NoError(t, srv.Validate())
require.NoError(t, srv.PreRun(context.Background()))
srv.Serve()
- t.Cleanup(func() { srv.GracefulStop() })
+ var once sync.Once
+ stopFn := func() { once.Do(func() { srv.GracefulStop() }) }
+ t.Cleanup(stopFn)
addr := net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(port),
10))
deadline := time.Now().Add(5 * time.Second)
for {
@@ -81,7 +92,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string,
func()) {
}
time.Sleep(50 * time.Millisecond)
}
- return addr, func() { srv.GracefulStop() }
+ return srv, addr, stopFn
}
func getFreePort(t *testing.T) uint32 {
@@ -899,21 +910,14 @@ func TestOnAddOrUpdate_NewNodeWithData(t *testing.T) {
reg.RegisterHandler("test-handler", schema.KindGroup|schema.KindStream,
handler)
// Now add server2 which has pre-existing data.
addNodeToRegistry(reg, "node-2", addr2)
- // Start triggers initializeFromSchemaClient for all active nodes.
+ // Start replays cached entries to handlers. Watch replay is async, so
+ // we need to wait for the handler to receive notifications.
reg.Start(context.Background())
// Handler should receive OnAddOrUpdate events for the group and stream
from server2.
- events := handler.getAddOrUpdateEvents()
- var hasGroup, hasStream bool
- for _, evt := range events {
- if evt.Kind == schema.KindGroup && evt.Name == "test-group" {
- hasGroup = true
- }
- if evt.Kind == schema.KindStream && evt.Name == testStreamName {
- hasStream = true
- }
- }
- assert.True(t, hasGroup, "handler should receive group OnAddOrUpdate,
events: %+v", events)
- assert.True(t, hasStream, "handler should receive stream OnAddOrUpdate,
events: %+v", events)
+ groupEvt := waitForAddOrUpdateEvent(t, handler, schema.KindGroup,
"test-group")
+ assert.Equal(t, schema.KindGroup, groupEvt.Kind)
+ streamEvt := waitForAddOrUpdateEvent(t, handler, schema.KindStream,
testStreamName)
+ assert.Equal(t, "test-group", streamEvt.Group)
// Registry should be able to read the stream.
ctx := context.Background()
got, getErr := reg.GetStream(ctx, &commonv1.Metadata{Group:
"test-group", Name: testStreamName})
@@ -1064,29 +1068,37 @@ func TestOnAddOrUpdate_HandlerNotification(t
*testing.T) {
reg.RegisterHandler("measure-handler", schema.KindMeasure,
measureHandler)
// Add server2 with pre-existing data.
addNodeToRegistry(reg, "node-2", addr2)
- // Start triggers initializeFromSchemaClient for all active nodes.
+ // Start replays cached entries to handlers. Watch replay is async.
reg.Start(context.Background())
- time.Sleep(500 * time.Millisecond)
- // Stream handler should get stream events but not measure events.
- streamEvents := streamHandler.getAddOrUpdateEvents()
- var streamHasStream bool
- for _, evt := range streamEvents {
- if evt.Kind == schema.KindStream && evt.Name == testStreamName {
- streamHasStream = true
+ // Wait for async watch replay to complete and handlers to receive
events.
+ require.Eventually(t, func() bool {
+ streamEvents := streamHandler.getAddOrUpdateEvents()
+ var streamHasStream bool
+ for _, evt := range streamEvents {
+ if evt.Kind == schema.KindStream && evt.Name ==
testStreamName {
+ streamHasStream = true
+ }
}
- assert.NotEqual(t, schema.KindMeasure, evt.Kind, "stream
handler should not receive measure events")
- }
- assert.True(t, streamHasStream, "stream handler should receive stream
OnAddOrUpdate")
- // Measure handler should get measure events but not stream events.
- measureEvents := measureHandler.getAddOrUpdateEvents()
- var measureHasMeasure bool
- for _, evt := range measureEvents {
- if evt.Kind == schema.KindMeasure && evt.Name == "test-measure"
{
- measureHasMeasure = true
+ measureEvents := measureHandler.getAddOrUpdateEvents()
+ var measureHasMeasure bool
+ for _, evt := range measureEvents {
+ if evt.Kind == schema.KindMeasure && evt.Name ==
"test-measure" {
+ measureHasMeasure = true
+ }
}
- assert.NotEqual(t, schema.KindStream, evt.Kind, "measure
handler should not receive stream events")
- }
- assert.True(t, measureHasMeasure, "measure handler should receive
measure OnAddOrUpdate")
+ return streamHasStream && measureHasMeasure
+ }, 5*time.Second, 100*time.Millisecond, "handlers should receive their
respective events")
+ // Verify handler isolation with concrete data checks.
+ streamEvents := streamHandler.getAddOrUpdateEvents()
+ assert.Len(t, streamEvents, 1, "stream handler should receive exactly 1
event")
+ assert.Equal(t, schema.KindStream, streamEvents[0].Kind)
+ assert.Equal(t, testStreamName, streamEvents[0].Name)
+ assert.Equal(t, "test-group", streamEvents[0].Group)
+ measureEvents := measureHandler.getAddOrUpdateEvents()
+ assert.Len(t, measureEvents, 1, "measure handler should receive exactly
1 event")
+ assert.Equal(t, schema.KindMeasure, measureEvents[0].Kind)
+ assert.Equal(t, "test-measure", measureEvents[0].Name)
+ assert.Equal(t, "measure-group", measureEvents[0].Group)
}
func startTestSchemaServerTLS(t *testing.T) string {
@@ -1183,9 +1195,8 @@ type storedSchema struct {
type recordingSchemaServer struct {
schemav1.UnimplementedSchemaManagementServiceServer
schemav1.UnimplementedSchemaUpdateServiceServer
- schemas map[string]*storedSchema
- sinceRevisions []int64
- mu sync.Mutex
+ schemas map[string]*storedSchema
+ mu sync.Mutex
}
func newRecordingSchemaServer() *recordingSchemaServer {
@@ -1200,27 +1211,18 @@ func (s *recordingSchemaServer) addSchema(prop
*propertyv1.Property) {
s.schemas[prop.GetId()] = &storedSchema{prop: prop}
}
-func (s *recordingSchemaServer) markDeleted(propID string) {
+func (s *recordingSchemaServer) updateSchema(prop *propertyv1.Property) {
s.mu.Lock()
defer s.mu.Unlock()
- if stored, ok := s.schemas[propID]; ok {
- stored.deleteTime = time.Now().UnixNano()
- }
-}
-
-func (s *recordingSchemaServer) aggregateCallCount() int {
- s.mu.Lock()
- defer s.mu.Unlock()
- return len(s.sinceRevisions)
+ s.schemas[prop.GetId()] = &storedSchema{prop: prop}
}
-func (s *recordingSchemaServer) lastSinceRevision() int64 {
+func (s *recordingSchemaServer) markDeleted(propID string) {
s.mu.Lock()
defer s.mu.Unlock()
- if len(s.sinceRevisions) == 0 {
- return 0
+ if stored, ok := s.schemas[propID]; ok {
+ stored.deleteTime = time.Now().UnixNano()
}
- return s.sinceRevisions[len(s.sinceRevisions)-1]
}
func (s *recordingSchemaServer) ListSchemas(req *schemav1.ListSchemasRequest,
stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse]) error {
@@ -1258,26 +1260,62 @@ func (s *recordingSchemaServer) ListSchemas(req
*schemav1.ListSchemasRequest, st
return nil
}
-func (s *recordingSchemaServer) AggregateSchemaUpdates(_ context.Context, req
*schemav1.AggregateSchemaUpdatesRequest)
(*schemav1.AggregateSchemaUpdatesResponse, error) {
- sinceRevision := extractSinceRevision(req.GetQuery())
- s.mu.Lock()
- s.sinceRevisions = append(s.sinceRevisions, sinceRevision)
- kindSet := make(map[string]struct{})
- for _, stored := range s.schemas {
- if stored.deleteTime > 0 {
- continue
+func (s *recordingSchemaServer) WatchSchemas(stream
grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse]) error {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+ metadataOnly := len(req.TagProjection) > 0
+ maxRevision := extractMaxRevisionFromCriteria(req.Criteria)
+ s.mu.Lock()
+ for _, stored := range s.schemas {
+ parsed := property.ParseTags(stored.prop.GetTags())
+ if maxRevision > 0 && parsed.UpdatedAt <= maxRevision {
+ continue
+ }
+ eventType :=
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT
+ if stored.deleteTime > 0 {
+ eventType =
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE
+ }
+ if sendErr :=
stream.Send(&schemav1.WatchSchemasResponse{
+ EventType: eventType,
+ Property: stored.prop,
+ MetadataOnly: metadataOnly,
+ DeleteTime: stored.deleteTime,
+ }); sendErr != nil {
+ s.mu.Unlock()
+ return sendErr
+ }
}
- parsed := property.ParseTags(stored.prop.GetTags())
- if parsed.UpdatedAt > sinceRevision {
- kindSet[stored.prop.GetMetadata().GetName()] =
struct{}{}
+ s.mu.Unlock()
+ if sendErr := stream.Send(&schemav1.WatchSchemasResponse{
+ EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+ }); sendErr != nil {
+ return sendErr
}
}
- s.mu.Unlock()
- names := make([]string, 0, len(kindSet))
- for kindName := range kindSet {
- names = append(names, kindName)
+}
+
+// extractMaxRevisionFromCriteria extracts the max_revision value from a
Criteria condition.
+// It looks for a condition on "updated_at" with BINARY_OP_GT.
+func extractMaxRevisionFromCriteria(c *modelv1.Criteria) int64 {
+ if c == nil {
+ return 0
+ }
+ if cond := c.GetCondition(); cond != nil {
+ if cond.GetName() == "updated_at" && cond.GetOp() ==
modelv1.Condition_BINARY_OP_GT {
+ return cond.GetValue().GetInt().GetValue()
+ }
+ return 0
+ }
+ if le := c.GetLe(); le != nil {
+ if v := extractMaxRevisionFromCriteria(le.GetLeft()); v > 0 {
+ return v
+ }
+ return extractMaxRevisionFromCriteria(le.GetRight())
}
- return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+ return 0
}
func (s *recordingSchemaServer) InsertSchema(_ context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
@@ -1299,14 +1337,6 @@ func (s *recordingSchemaServer) DeleteSchema(_
context.Context, req *schemav1.De
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
-func extractSinceRevision(query *propertyv1.QueryRequest) int64 {
- cond := query.GetCriteria().GetCondition()
- if cond != nil && cond.GetName() == "updated_at" {
- return cond.GetValue().GetInt().GetValue()
- }
- return 0
-}
-
func startRecordingServer(t *testing.T) (*recordingSchemaServer, string) {
t.Helper()
mock := newRecordingSchemaServer()
@@ -1372,30 +1402,35 @@ func buildMockStreamProperty(t *testing.T, name string,
updatedAt time.Time) *pr
}
func TestSyncLoop(t *testing.T) {
- t.Run("incremental_sync_calls_aggregate", func(t *testing.T) {
+ t.Run("full_reconcile_detects_deletion", func(t *testing.T) {
mock, addr := startRecordingServer(t)
mock.addSchema(buildMockGroupProperty(t))
mock.addSchema(buildMockStreamProperty(t, testStreamName,
time.Now()))
+ streamPropID := property.BuildPropertyID(schema.KindStream,
&commonv1.Metadata{Group: "test-group", Name: testStreamName})
reg := newTestRegistryWithConfig(t, &property.ClientConfig{
- SyncInterval: 50 * time.Millisecond,
- FullReconcileEvery: 100,
+ SyncInterval: 50 * time.Millisecond,
}, addr)
handler := &testEventHandler{}
reg.RegisterHandler("sync-handler",
schema.KindGroup|schema.KindStream, handler)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, reg.Start(ctx))
- require.Eventually(t, func() bool {
- return mock.aggregateCallCount() >= 3
- }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected
at least 3 AggregateSchemaUpdates calls")
+ initEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, testStreamName)
+ assert.Equal(t, "test-group", initEvt.Group)
+ // Mark stream as deleted on the server.
+ mock.markDeleted(streamPropID)
+ // Full reconcile should detect the deletion and notify the
handler.
+ deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream,
testStreamName)
+ assert.Equal(t, "test-group", deleteEvt.Group)
})
- t.Run("full_sync_skips_aggregate", func(t *testing.T) {
+ t.Run("full_reconcile_detects_update", func(t *testing.T) {
mock, addr := startRecordingServer(t)
mock.addSchema(buildMockGroupProperty(t))
- mock.addSchema(buildMockStreamProperty(t, testStreamName,
time.Now()))
+ initialTime := time.Now()
+ mock.addSchema(buildMockStreamProperty(t, testStreamName,
initialTime))
reg := newTestRegistryWithConfig(t, &property.ClientConfig{
- SyncInterval: 50 * time.Millisecond,
+ SyncInterval: 500 * time.Millisecond,
FullReconcileEvery: 1,
}, addr)
handler := &testEventHandler{}
@@ -1403,26 +1438,30 @@ func TestSyncLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, reg.Start(ctx))
- // Wait for init to deliver the stream.
+ waitForAddOrUpdateEvent(t, handler, schema.KindStream,
testStreamName)
+ // Update the stream on the server with a newer revision.
+ mock.updateSchema(buildMockStreamProperty(t, testStreamName,
initialTime.Add(10*time.Second)))
+ // Full reconcile should detect the higher revision and
re-fetch + update cache.
require.Eventually(t, func() bool {
+ count := 0
for _, evt := range handler.getAddOrUpdateEvents() {
if evt.Kind == schema.KindStream && evt.Name ==
testStreamName {
- return true
+ count++
}
}
- return false
- }, testflags.EventuallyTimeout, 20*time.Millisecond)
- // Let several sync rounds fire (all full).
- time.Sleep(300 * time.Millisecond)
- assert.Equal(t, 0, mock.aggregateCallCount(), "full sync should
not call AggregateSchemaUpdates")
+ return count >= 2
+ }, testflags.EventuallyTimeout, 20*time.Millisecond, "full
reconcile should detect updated stream")
+ latestEvt := findLastEvent(handler.getAddOrUpdateEvents(),
schema.KindStream, testStreamName)
+ assert.Equal(t, "test-group", latestEvt.Group)
})
- t.Run("sinceRevision_advances_with_new_data", func(t *testing.T) {
- mock, addr := startRecordingServer(t)
- mock.addSchema(buildMockGroupProperty(t))
- mock.addSchema(buildMockStreamProperty(t, testStreamName,
time.Now()))
+ t.Run("incremental_sync_picks_up_new_data", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"initial-stream", time.Now()))
reg := newTestRegistryWithConfig(t, &property.ClientConfig{
- SyncInterval: 50 * time.Millisecond,
+ SyncInterval: 500 * time.Millisecond,
FullReconcileEvery: 100,
}, addr)
handler := &testEventHandler{}
@@ -1430,43 +1469,22 @@ func TestSyncLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, reg.Start(ctx))
- // Wait for at least 2 aggregate calls to establish a baseline
revision.
- require.Eventually(t, func() bool {
- return mock.aggregateCallCount() >= 2
- }, testflags.EventuallyTimeout, 20*time.Millisecond)
- rev0 := mock.lastSinceRevision()
- assert.Greater(t, rev0, int64(0), "sinceRevision should be > 0
after init")
- // With no new data, sinceRevision should remain stable.
- countBefore := mock.aggregateCallCount()
- require.Eventually(t, func() bool {
- return mock.aggregateCallCount() >= countBefore+2
- }, testflags.EventuallyTimeout, 20*time.Millisecond)
- assert.Equal(t, rev0, mock.lastSinceRevision(), "sinceRevision
should not change without new data")
- // Add a new stream with a far-future updatedAt.
- futureTime := time.Now().Add(time.Hour)
- mock.addSchema(buildMockStreamProperty(t, "new-stream",
futureTime))
- // sinceRevision should advance after the new data is picked up.
- require.Eventually(t, func() bool {
- return mock.lastSinceRevision() > rev0
- }, testflags.EventuallyTimeout, 20*time.Millisecond,
"sinceRevision should advance after new data")
- // Handler should have received the new stream.
- require.Eventually(t, func() bool {
- for _, evt := range handler.getAddOrUpdateEvents() {
- if evt.Kind == schema.KindStream && evt.Name ==
"new-stream" {
- return true
- }
- }
- return false
- }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler
should receive OnAddOrUpdate for new-stream")
+ waitForServerWatcher(t, srv)
+ waitForAddOrUpdateEvent(t, handler, schema.KindStream,
"initial-stream")
+ // Insert new stream directly on server.
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"incremental-stream", time.Now()))
+ found := waitForAddOrUpdateEvent(t, handler, schema.KindStream,
"incremental-stream")
+ assert.Equal(t, "test-group", found.Group)
})
- t.Run("full_reconcile_detects_deletion", func(t *testing.T) {
+ t.Run("full_reconcile_every_1_runs_each_tick", func(t *testing.T) {
mock, addr := startRecordingServer(t)
mock.addSchema(buildMockGroupProperty(t))
- mock.addSchema(buildMockStreamProperty(t, testStreamName,
time.Now()))
- streamPropID := property.BuildPropertyID(schema.KindStream,
&commonv1.Metadata{Group: "test-group", Name: testStreamName})
+ mock.addSchema(buildMockStreamProperty(t, "s1", time.Now()))
+ mock.addSchema(buildMockStreamProperty(t, "s2", time.Now()))
+ s2PropID := property.BuildPropertyID(schema.KindStream,
&commonv1.Metadata{Group: "test-group", Name: "s2"})
reg := newTestRegistryWithConfig(t, &property.ClientConfig{
- SyncInterval: 50 * time.Millisecond,
+ SyncInterval: 500 * time.Millisecond,
FullReconcileEvery: 1,
}, addr)
handler := &testEventHandler{}
@@ -1474,25 +1492,220 @@ func TestSyncLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, reg.Start(ctx))
- // Wait for init to deliver the stream.
- require.Eventually(t, func() bool {
- for _, evt := range handler.getAddOrUpdateEvents() {
- if evt.Kind == schema.KindStream && evt.Name ==
testStreamName {
- return true
- }
+ waitForAddOrUpdateEvent(t, handler, schema.KindStream, "s1")
+ waitForAddOrUpdateEvent(t, handler, schema.KindStream, "s2")
+ // Mark s2 as deleted. Full reconcile (every tick) should
detect it quickly.
+ mock.markDeleted(s2PropID)
+ deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream,
"s2")
+ assert.Equal(t, "test-group", deleteEvt.Group)
+ })
+}
+
+func waitForAddOrUpdateEvent(t *testing.T, handler *testEventHandler, kind
schema.Kind, name string) schema.Metadata {
+ t.Helper()
+ require.Eventually(t, func() bool {
+ for _, evt := range handler.getAddOrUpdateEvents() {
+ if evt.Kind == kind && evt.Name == name {
+ return true
}
- return false
- }, testflags.EventuallyTimeout, 20*time.Millisecond)
- // Mark stream as deleted on the server.
- mock.markDeleted(streamPropID)
- // Full reconcile should detect the deletion and notify the
handler.
- require.Eventually(t, func() bool {
- for _, evt := range handler.getDeleteEvents() {
- if evt.Kind == schema.KindStream &&
strings.Contains(evt.Name, testStreamName) {
- return true
- }
+ }
+ return false
+ }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected
AddOrUpdate event for %s/%s", kind, name)
+ return findLastEvent(handler.getAddOrUpdateEvents(), kind, name)
+}
+
+func waitForDeleteEvent(t *testing.T, handler *testEventHandler, kind
schema.Kind, name string) schema.Metadata {
+ t.Helper()
+ require.Eventually(t, func() bool {
+ for _, evt := range handler.getDeleteEvents() {
+ if evt.Kind == kind && evt.Name == name {
+ return true
}
- return false
- }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler
should receive OnDelete for test-stream")
+ }
+ return false
+ }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected Delete
event for %s/%s", kind, name)
+ return findLastEvent(handler.getDeleteEvents(), kind, name)
+}
+
+func findLastEvent(events []schema.Metadata, kind schema.Kind, name string)
schema.Metadata {
+ for idx := len(events) - 1; idx >= 0; idx-- {
+ if events[idx].Kind == kind && events[idx].Name == name {
+ return events[idx]
+ }
+ }
+ return schema.Metadata{}
+}
+
+func waitForServerWatcher(t *testing.T, srv schemaserver.Server) {
+ t.Helper()
+ type watcherCounter interface{ WatcherCount() int }
+ wc, ok := srv.(watcherCounter)
+ require.True(t, ok, "server does not implement WatcherCount")
+ require.Eventually(t, func() bool {
+ return wc.WatcherCount() > 0
+ }, testflags.EventuallyTimeout, 20*time.Millisecond, "expected at least
one watcher to connect")
+}
+
+func insertSchemaProperty(t *testing.T, mgmt
schemav1.SchemaManagementServiceClient, prop *propertyv1.Property) {
+ t.Helper()
+ _, err := mgmt.InsertSchema(context.Background(),
&schemav1.InsertSchemaRequest{Property: prop})
+ require.NoError(t, err)
+}
+
+func updateSchemaProperty(t *testing.T, mgmt
schemav1.SchemaManagementServiceClient, prop *propertyv1.Property) {
+ t.Helper()
+ _, err := mgmt.UpdateSchema(context.Background(),
&schemav1.UpdateSchemaRequest{Property: prop})
+ require.NoError(t, err)
+}
+
+func deleteSchemaProperty(t *testing.T, mgmt
schemav1.SchemaManagementServiceClient, name, id string) {
+ t.Helper()
+ _, err := mgmt.DeleteSchema(context.Background(),
&schemav1.DeleteSchemaRequest{
+ Delete: &propertyv1.DeleteRequest{Group: schema.SchemaGroup,
Name: name, Id: id},
+ UpdateAt: timestamppb.Now(),
+ })
+ require.NoError(t, err)
+}
+
+func TestWatchPush(t *testing.T) {
+ t.Run("watch_push_insert", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv)
+ // Insert a new stream via the real server RPC — triggers watch
broadcast.
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"watch-stream", time.Now()))
+ insertEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, "watch-stream")
+ assert.Equal(t, "test-group", insertEvt.Group)
+ })
+
+ t.Run("watch_push_update", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"update-stream", time.Now().Add(-time.Hour)))
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv)
+ initEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, "update-stream")
+ assert.Equal(t, "test-group", initEvt.Group)
+ initialCount := len(handler.getAddOrUpdateEvents())
+ // Update with a newer timestamp via the real server RPC.
+ updateSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"update-stream", time.Now()))
+ require.Eventually(t, func() bool {
+ return len(handler.getAddOrUpdateEvents()) >
initialCount
+ }, testflags.EventuallyTimeout, 20*time.Millisecond, "handler
should receive watch push update")
+ lastEvt := findLastEvent(handler.getAddOrUpdateEvents(),
schema.KindStream, "update-stream")
+ assert.Equal(t, "test-group", lastEvt.Group)
+ })
+
+ t.Run("watch_push_delete", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ streamProp := buildMockStreamProperty(t, testDeleteStreamName,
time.Now())
+ insertSchemaProperty(t, mgmt, streamProp)
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv)
+ initEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, testDeleteStreamName)
+ assert.Equal(t, "test-group", initEvt.Group)
+ // Delete via the real server RPC — triggers watch broadcast.
+ deleteSchemaProperty(t, mgmt,
streamProp.GetMetadata().GetName(), streamProp.GetId())
+ deleteEvt := waitForDeleteEvent(t, handler, schema.KindStream,
testDeleteStreamName)
+ assert.Equal(t, "test-group", deleteEvt.Group)
+ })
+
+ t.Run("watch_older_revision_ignored", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ now := time.Now()
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"rev-stream", now))
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv)
+ initEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, "rev-stream")
+ assert.Equal(t, "test-group", initEvt.Group)
+ initialCount := len(handler.getAddOrUpdateEvents())
+ // Update with an older version — should be ignored by cache
revision check.
+ updateSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"rev-stream", now.Add(-time.Hour)))
+ time.Sleep(200 * time.Millisecond)
+ assert.Equal(t, initialCount,
len(handler.getAddOrUpdateEvents()), "older revision should not trigger
handler")
+ })
+
+ t.Run("watch_new_node_added", func(t *testing.T) {
+ srv1, addr1, _ := startTestSchemaServerFull(t)
+ mgmt1 := rawMgmtClient(t, addr1)
+ insertSchemaProperty(t, mgmt1, buildMockGroupProperty(t))
+ srv2, addr2, _ := startTestSchemaServerFull(t)
+ mgmt2 := rawMgmtClient(t, addr2)
+ insertSchemaProperty(t, mgmt2, buildMockGroupProperty(t))
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr1)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv1)
+ // Dynamically add the second server.
+ addNodeToRegistry(reg, "new-node", addr2)
+ waitForServerWatcher(t, srv2)
+ // Insert via the new server — client should receive it through
watch.
+ insertSchemaProperty(t, mgmt2, buildMockStreamProperty(t,
"new-node-stream", time.Now()))
+ newNodeEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, "new-node-stream")
+ assert.Equal(t, "test-group", newNodeEvt.Group)
+ })
+
+ t.Run("watch_server_restart", func(t *testing.T) {
+ srv, addr, _ := startTestSchemaServerFull(t)
+ mgmt := rawMgmtClient(t, addr)
+ insertSchemaProperty(t, mgmt, buildMockGroupProperty(t))
+ reg := newTestRegistryWithConfig(t, &property.ClientConfig{
+ SyncInterval: 10 * time.Minute,
+ }, addr)
+ handler := &testEventHandler{}
+ reg.RegisterHandler("watch-handler",
schema.KindGroup|schema.KindStream, handler)
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ require.NoError(t, reg.Start(ctx))
+ waitForServerWatcher(t, srv)
+ // Verify initial watcher count.
+ type watcherCounter interface{ WatcherCount() int }
+ wc, ok := srv.(watcherCounter)
+ require.True(t, ok, "server does not implement WatcherCount")
+ assert.Equal(t, 1, wc.WatcherCount())
+ // Insert via the real server RPC — should be received through
watch.
+ insertSchemaProperty(t, mgmt, buildMockStreamProperty(t,
"before-restart", time.Now()))
+ restartEvt := waitForAddOrUpdateEvent(t, handler,
schema.KindStream, "before-restart")
+ assert.Equal(t, "test-group", restartEvt.Group)
})
}
diff --git a/banyand/metadata/schema/property/converter.go
b/banyand/metadata/schema/property/converter.go
index af6051c24..af6987731 100644
--- a/banyand/metadata/schema/property/converter.go
+++ b/banyand/metadata/schema/property/converter.go
@@ -166,6 +166,28 @@ func ParseTags(tags []*modelv1.Tag) ParsedTags {
return pt
}
+// basicTagKeys are the tag keys used for metadata-only projections (full sync
digest).
+var basicTagKeys = []string{TagKeyKind, TagKeyGroup, TagKeyName,
TagKeyUpdatedAt}
+
+// buildRevisionCriteria builds a Criteria for updated_at > sinceRevision.
+// Returns nil if sinceRevision <= 0.
+func buildRevisionCriteria(sinceRevision int64) *modelv1.Criteria {
+ if sinceRevision <= 0 {
+ return nil
+ }
+ return &modelv1.Criteria{
+ Exp: &modelv1.Criteria_Condition{
+ Condition: &modelv1.Condition{
+ Name: TagKeyUpdatedAt,
+ Op: modelv1.Condition_BINARY_OP_GT,
+ Value: &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: sinceRevision}},
+ },
+ },
+ },
+ }
+}
+
func buildSchemaQuery(kind schema.Kind, group, name string, sinceRevision
int64) *propertyv1.QueryRequest {
query := &propertyv1.QueryRequest{
Groups: []string{schema.SchemaGroup},
@@ -219,24 +241,39 @@ func buildSchemaQuery(kind schema.Kind, group, name
string, sinceRevision int64)
return query
}
-func buildUpdatedSchemasQuery(sinceRevision int64) *propertyv1.QueryRequest {
- query := &propertyv1.QueryRequest{
- Groups: []string{schema.SchemaGroup},
+// ParsePropID parses a property ID into kind, group, and name.
+// Format: "<kind>_<group>/<name>" or "<kind>_<name>" (for KindGroup).
+func ParsePropID(propID string) (kind schema.Kind, group, name string, err
error) {
+ underscoreIdx := -1
+ for idx := range propID {
+ if propID[idx] == '_' {
+ underscoreIdx = idx
+ break
+ }
}
- if sinceRevision > 0 {
- query.Criteria = &modelv1.Criteria{
- Exp: &modelv1.Criteria_Condition{
- Condition: &modelv1.Condition{
- Name: TagKeyUpdatedAt,
- Op: modelv1.Condition_BINARY_OP_GT,
- Value: &modelv1.TagValue{
- Value:
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: sinceRevision}},
- },
- },
- },
+ if underscoreIdx < 0 {
+ return 0, "", "", fmt.Errorf("invalid propID format: %s",
propID)
+ }
+ kindStr := propID[:underscoreIdx]
+ rest := propID[underscoreIdx+1:]
+ kind, err = KindFromString(kindStr)
+ if err != nil {
+ return 0, "", "", err
+ }
+ if kind == schema.KindGroup {
+ return kind, "", rest, nil
+ }
+ slashIdx := -1
+ for idx := range rest {
+ if rest[idx] == '/' {
+ slashIdx = idx
+ break
}
}
- return query
+ if slashIdx < 0 {
+ return 0, "", "", fmt.Errorf("invalid propID format (missing
group/name separator): %s", propID)
+ }
+ return kind, rest[:slashIdx], rest[slashIdx+1:], nil
}
// buildDeleteRequest builds a DeleteSchema request.
diff --git a/banyand/metadata/schema/property/converter_test.go
b/banyand/metadata/schema/property/converter_test.go
index e9393e34c..5c14d9832 100644
--- a/banyand/metadata/schema/property/converter_test.go
+++ b/banyand/metadata/schema/property/converter_test.go
@@ -417,17 +417,30 @@ func TestBuildSchemaQuery_WithNameIgnoresSinceRevision(t
*testing.T) {
assert.Nil(t, query.GetCriteria(), "name-based lookup should not have
criteria even with sinceRevision")
}
-func TestBuildUpdatedSchemasQuery_Zero(t *testing.T) {
- query := buildUpdatedSchemasQuery(0)
- assert.Nil(t, query.GetCriteria())
- assert.Empty(t, query.GetName())
- assert.Equal(t, []string{schema.SchemaGroup}, query.GetGroups())
-}
-
-func TestBuildUpdatedSchemasQuery_Positive(t *testing.T) {
- query := buildUpdatedSchemasQuery(100)
- assert.NotNil(t, query.GetCriteria())
- assert.Equal(t, []string{schema.SchemaGroup}, query.GetGroups())
+func TestParsePropID(t *testing.T) {
+ kind, group, name, err := ParsePropID("stream_g1/s1")
+ require.NoError(t, err)
+ assert.Equal(t, schema.KindStream, kind)
+ assert.Equal(t, "g1", group)
+ assert.Equal(t, "s1", name)
+
+ kind, group, name, err = ParsePropID("group_my-group")
+ require.NoError(t, err)
+ assert.Equal(t, schema.KindGroup, kind)
+ assert.Empty(t, group)
+ assert.Equal(t, "my-group", name)
+
+ invalidKind, invalidGroup, invalidName, err := ParsePropID("invalid")
+ assert.Error(t, err)
+ assert.Equal(t, schema.Kind(0), invalidKind)
+ assert.Empty(t, invalidGroup)
+ assert.Empty(t, invalidName)
+
+ unknownKind, unknownGroup, unknownName, err :=
ParsePropID("unknown_g1/s1")
+ assert.Error(t, err)
+ assert.Equal(t, schema.Kind(0), unknownKind)
+ assert.Empty(t, unknownGroup)
+ assert.Empty(t, unknownName)
}
func TestBuildDeleteRequest(t *testing.T) {
diff --git a/banyand/metadata/schema/property/schema_client.go
b/banyand/metadata/schema/property/schema_client.go
index 84d878988..0197418a3 100644
--- a/banyand/metadata/schema/property/schema_client.go
+++ b/banyand/metadata/schema/property/schema_client.go
@@ -45,6 +45,7 @@ var _ grpchelper.Client = (*schemaClient)(nil)
type connectionHandler struct {
l *logger.Logger
+ registry *SchemaRegistry
caCertReloader *pkgtls.Reloader
tlsEnabled bool
}
@@ -81,11 +82,17 @@ func (h *connectionHandler) NewClient(conn
*grpc.ClientConn, _ *databasev1.Node)
}
// OnActive is called when a node transitions to active.
-func (h *connectionHandler) OnActive(name string, _ *schemaClient) {
+func (h *connectionHandler) OnActive(name string, client *schemaClient) {
h.l.Info().Str("node", name).Msg("schema server node is active")
+ if h.registry != nil {
+ h.registry.launchWatch(name, client)
+ }
}
// OnInactive is called when a node leaves active.
func (h *connectionHandler) OnInactive(name string, _ *schemaClient) {
h.l.Info().Str("node", name).Msg("schema server node is inactive")
+ if h.registry != nil {
+ h.registry.stopWatch(name)
+ }
}
diff --git a/banyand/metadata/schema/schemaserver/grpc.go
b/banyand/metadata/schema/schemaserver/grpc.go
index 5929cbe92..e1b3a6272 100644
--- a/banyand/metadata/schema/schemaserver/grpc.go
+++ b/banyand/metadata/schema/schemaserver/grpc.go
@@ -19,6 +19,8 @@ package schemaserver
import (
"context"
+ "errors"
+ "io"
"time"
"google.golang.org/grpc"
@@ -26,6 +28,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -55,9 +58,10 @@ func newServerMetrics(factory observability.Factory)
*serverMetrics {
type schemaManagementServer struct {
schemav1.UnimplementedSchemaManagementServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l *logger.Logger
+ metrics *serverMetrics
}
// InsertSchema inserts a new schema property.
@@ -101,6 +105,10 @@ func (s *schemaManagementServer) InsertSchema(ctx
context.Context, req *schemav1
s.l.Error().Err(updateErr).Msg("failed to insert schema")
return nil, updateErr
}
+ s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+ EventType: schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT,
+ Property: req.Property,
+ })
return &schemav1.InsertSchemaResponse{}, nil
}
@@ -128,6 +136,10 @@ func (s *schemaManagementServer) UpdateSchema(ctx
context.Context, req *schemav1
s.l.Error().Err(updateErr).Msg("failed to update schema")
return nil, updateErr
}
+ s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+ EventType: schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE,
+ Property: req.Property,
+ })
return &schemav1.UpdateSchemaResponse{}, nil
}
@@ -206,11 +218,19 @@ func (s *schemaManagementServer) DeleteSchema(ctx
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+ var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
+ s.metrics.totalErr.Inc(1, "delete")
+ s.l.Error().Err(unmarshalErr).Msg("failed to unmarshal
property for delete broadcast")
+ return nil, unmarshalErr
+ }
+ deletedProps = append(deletedProps, &p)
}
if len(ids) == 0 {
return &schemav1.DeleteSchemaResponse{Found: false}, nil
@@ -220,6 +240,14 @@ func (s *schemaManagementServer) DeleteSchema(ctx
context.Context, req *schemav1
s.l.Error().Err(deleteErr).Msg("failed to delete schema")
return nil, deleteErr
}
+ deleteTimeNano := req.UpdateAt.AsTime().UnixNano()
+ for _, dp := range deletedProps {
+ s.watchers.Broadcast(&schemav1.WatchSchemasResponse{
+ EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE,
+ Property: dp,
+ DeleteTime: deleteTimeNano,
+ })
+ }
return &schemav1.DeleteSchemaResponse{Found: true}, nil
}
@@ -252,47 +280,119 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l *logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ close(reqCh)
+ return
+ }
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
+ if !ok {
+ return nil
+ }
+ if replayErr := s.replaySchemas(stream, req); replayErr
!= nil {
+ return replayErr
+ }
+ if doneErr :=
stream.Send(&schemav1.WatchSchemasResponse{
+ EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+ }); doneErr != nil {
+ return doneErr
+ }
+ case resp, ok := <-ch:
+ if !ok {
+ return nil
+ }
+ if sendErr := stream.Send(resp); sendErr != nil {
+ return sendErr
+ }
+ }
}
- req.Query.Groups = []string{schema.SchemaGroup}
- results, queryErr := s.server.db.Query(ctx, req.Query)
- if queryErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- s.l.Error().Err(queryErr).Msg("failed to aggregate schema
updates")
- return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+ req *schemav1.WatchSchemasRequest,
+) error {
+ metadataOnly := len(req.TagProjection) > 0
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schema.SchemaGroup},
+ Criteria: req.Criteria,
+ }
+ results, err := s.server.db.Query(stream.Context(), query)
+ if err != nil {
+ return err
}
- nameSet := make(map[string]struct{}, len(results))
for _, result := range results {
var p propertyv1.Property
if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- return nil, unmarshalErr
+ s.metrics.totalErr.Inc(1, "replay")
+ return unmarshalErr
+ }
+ if metadataOnly {
+ p.Tags = filterTags(p.Tags, req.TagProjection)
}
- if p.Metadata != nil && p.Metadata.Name != "" {
- nameSet[p.Metadata.Name] = struct{}{}
+ eventType := schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT
+ dt := result.DeleteTime()
+ if dt > 0 {
+ eventType =
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE
+ }
+ if sendErr := stream.Send(&schemav1.WatchSchemasResponse{
+ EventType: eventType,
+ Property: &p,
+ MetadataOnly: metadataOnly,
+ DeleteTime: dt,
+ }); sendErr != nil {
+ return sendErr
}
}
- names := make([]string, 0, len(nameSet))
- for name := range nameSet {
- names = append(names, name)
+ return nil
+}
+
+func filterTags(tags []*modelv1.Tag, projection []string) []*modelv1.Tag {
+ allowed := make(map[string]struct{}, len(projection))
+ for _, key := range projection {
+ allowed[key] = struct{}{}
+ }
+ filtered := make([]*modelv1.Tag, 0, len(projection))
+ for _, tag := range tags {
+ if _, ok := allowed[tag.GetKey()]; ok {
+ filtered = append(filtered, tag)
+ }
}
- return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+ return filtered
}
func errInvalidRequest(msg string) error {
diff --git a/banyand/metadata/schema/schemaserver/grpc_test.go
b/banyand/metadata/schema/schemaserver/grpc_test.go
index 3de1812e6..dd6a7eb32 100644
--- a/banyand/metadata/schema/schemaserver/grpc_test.go
+++ b/banyand/metadata/schema/schemaserver/grpc_test.go
@@ -397,39 +397,152 @@ func TestRepairSchema(t *testing.T) {
})
}
-func TestAggregateSchemaUpdates(t *testing.T) {
- t.Run("nil query", func(t *testing.T) {
- _, upd := startTestServer(t)
- _, rpcErr := upd.AggregateSchemaUpdates(context.Background(),
&schemav1.AggregateSchemaUpdatesRequest{})
- require.Error(t, rpcErr)
- assert.Equal(t, codes.InvalidArgument, grpcstatus.Code(rpcErr))
- assert.Contains(t, rpcErr.Error(), "query is required")
- })
- t.Run("empty results", func(t *testing.T) {
- _, upd := startTestServer(t)
- resp, rpcErr :=
upd.AggregateSchemaUpdates(context.Background(),
&schemav1.AggregateSchemaUpdatesRequest{
- Query: &propertyv1.QueryRequest{Groups:
[]string{schema.SchemaGroup}, Name: "nonexistent"},
- })
- require.NoError(t, rpcErr)
- assert.Empty(t, resp.Names)
- })
- t.Run("single result", func(t *testing.T) {
+func TestWatchSchemas_BiDirReplay(t *testing.T) {
+ t.Run("replay_from_revision", func(t *testing.T) {
mgmt, upd := startTestServer(t)
+ // Insert a property.
insertProperty(t, mgmt, "svc-a", "id1")
- resp, rpcErr :=
upd.AggregateSchemaUpdates(context.Background(),
&schemav1.AggregateSchemaUpdatesRequest{
- Query: &propertyv1.QueryRequest{Groups:
[]string{schema.SchemaGroup}, Name: "svc-a"},
+ // Wait a bit then insert another.
+ time.Sleep(10 * time.Millisecond)
+ // Get the first property's updatedAt to use as max_revision.
+ stream, streamErr := mgmt.ListSchemas(context.Background(),
&schemav1.ListSchemasRequest{
+ Query: &propertyv1.QueryRequest{Groups:
[]string{schema.SchemaGroup}, Name: "svc-a", Ids: []string{"id1"}},
})
- require.NoError(t, rpcErr)
- assert.Equal(t, []string{"svc-a"}, resp.Names)
+ require.NoError(t, streamErr)
+ responses, recvErr := collectListResponses(stream)
+ require.NoError(t, recvErr)
+ require.Len(t, responses, 1)
+ // Insert a second property after the first.
+ insertProperty(t, mgmt, "svc-b", "id2")
+ // Start bidi watch with max_revision = 0 (replay all).
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ watchStream, watchErr := upd.WatchSchemas(ctx)
+ require.NoError(t, watchErr)
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+ // Collect events until REPLAY_DONE.
+ var events []*schemav1.WatchSchemasResponse
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ events = append(events, resp)
+ }
+ assert.GreaterOrEqual(t, len(events), 2, "should replay all
properties")
})
- t.Run("duplicates deduped", func(t *testing.T) {
+
+ t.Run("metadata_only_replay", func(t *testing.T) {
mgmt, upd := startTestServer(t)
- insertProperty(t, mgmt, "svc-a", "id1")
- insertProperty(t, mgmt, "svc-a", "id2")
- resp, rpcErr :=
upd.AggregateSchemaUpdates(context.Background(),
&schemav1.AggregateSchemaUpdatesRequest{
- Query: &propertyv1.QueryRequest{Groups:
[]string{schema.SchemaGroup}, Name: "svc-a"},
+ insertProperty(t, mgmt, "svc-meta", "id-meta")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ watchStream, watchErr := upd.WatchSchemas(ctx)
+ require.NoError(t, watchErr)
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{
+ TagProjection: []string{"kind", "group", "name",
"updated_at"},
+ }))
+ var events []*schemav1.WatchSchemasResponse
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ events = append(events, resp)
+ }
+ require.NotEmpty(t, events)
+ for _, evt := range events {
+ assert.True(t, evt.MetadataOnly, "replay events should
have metadata_only=true")
+ // Verify that only projected tags are returned, not
the full property tags.
+ allowedKeys := map[string]bool{"kind": true, "group":
true, "name": true, "updated_at": true}
+ for _, tag := range evt.Property.GetTags() {
+ assert.True(t, allowedKeys[tag.GetKey()],
+ "unexpected tag key %q in metadata_only
replay; only projected tags should be returned", tag.GetKey())
+ }
+ }
+ })
+
+ t.Run("live_events_after_replay", func(t *testing.T) {
+ mgmt, upd := startTestServer(t)
+ insertProperty(t, mgmt, "svc-live", "id-live")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ watchStream, watchErr := upd.WatchSchemas(ctx)
+ require.NoError(t, watchErr)
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+ // Drain replay.
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ }
+ // Insert new data - should come through as live event.
+ insertProperty(t, mgmt, "svc-new", "id-new")
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ assert.Equal(t,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, resp.EventType)
+ assert.Equal(t, "id-new", resp.Property.GetId())
+ })
+
+ t.Run("multiple_requests", func(t *testing.T) {
+ mgmt, upd := startTestServer(t)
+ insertProperty(t, mgmt, "svc-multi", "id-multi")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ watchStream, watchErr := upd.WatchSchemas(ctx)
+ require.NoError(t, watchErr)
+ // First request.
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ }
+ // Second request on same stream (metadata only).
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{
+ TagProjection: []string{"kind", "group", "name",
"updated_at"},
+ }))
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ assert.True(t, resp.MetadataOnly)
+ }
+ })
+
+ t.Run("replay_includes_deleted", func(t *testing.T) {
+ mgmt, upd := startTestServer(t)
+ insertProperty(t, mgmt, "svc-del", "id-del")
+ _, deleteErr := mgmt.DeleteSchema(context.Background(),
&schemav1.DeleteSchemaRequest{
+ Delete: &propertyv1.DeleteRequest{Group:
schema.SchemaGroup, Name: "svc-del", Id: "id-del"},
+ UpdateAt: timestamppb.Now(),
})
- require.NoError(t, rpcErr)
- assert.Equal(t, []string{"svc-a"}, resp.Names)
+ require.NoError(t, deleteErr)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ watchStream, watchErr := upd.WatchSchemas(ctx)
+ require.NoError(t, watchErr)
+ require.NoError(t,
watchStream.Send(&schemav1.WatchSchemasRequest{}))
+ var hasDeleteEvent bool
+ for {
+ resp, err := watchStream.Recv()
+ require.NoError(t, err)
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE {
+ break
+ }
+ if resp.EventType ==
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE && resp.Property.GetId() ==
"id-del" {
+ hasDeleteEvent = true
+ assert.Greater(t, resp.DeleteTime, int64(0),
"delete event should carry a non-zero delete_time")
+ }
+ }
+ assert.True(t, hasDeleteEvent, "replay should include deleted
entries as DELETE events")
})
}
diff --git a/banyand/metadata/schema/schemaserver/service.go
b/banyand/metadata/schema/schemaserver/service.go
index 4f613faf2..8944d93db 100644
--- a/banyand/metadata/schema/schemaserver/service.go
+++ b/banyand/metadata/schema/schemaserver/service.go
@@ -80,6 +80,7 @@ type server struct {
l *logger.Logger
ser *grpclib.Server
tlsReloader *pkgtls.Reloader
+ watchers *watcherManager
schemaService *schemaManagementServer
updateService *schemaUpdateServer
host string
@@ -114,6 +115,14 @@ func (s *server) GetPort() *uint32 {
return &s.port
}
+// WatcherCount returns the number of active schema watchers.
+func (s *server) WatcherCount() int {
+ if s.watchers == nil {
+ return 0
+ }
+ return s.watchers.Count()
+}
+
// RegisterGossip registers the DB's repair gRPC services with the gossip
messenger.
func (s *server) RegisterGossip(messenger gossip.Messenger) {
s.db.RegisterGossip(messenger)
@@ -176,17 +185,20 @@ func (s *server) PreRun(_ context.Context) error {
s.l = logger.GetLogger("schema-server")
s.lfs = fs.NewLocalFileSystem()
+ s.watchers = newWatcherManager(s.l)
grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
sm := newServerMetrics(grpcFactory)
s.schemaService = &schemaManagementServer{
- server: s,
- l: s.l,
- metrics: sm,
+ server: s,
+ watchers: s.watchers,
+ l: s.l,
+ metrics: sm,
}
s.updateService = &schemaUpdateServer{
- server: s,
- l: s.l,
- metrics: sm,
+ server: s,
+ watchers: s.watchers,
+ l: s.l,
+ metrics: sm,
}
if s.tls {
@@ -307,6 +319,9 @@ func (s *server) Serve() run.StopNotify {
}
func (s *server) GracefulStop() {
+ if s.watchers != nil {
+ s.watchers.Close()
+ }
if s.tlsReloader != nil {
s.tlsReloader.Stop()
}
diff --git a/banyand/metadata/schema/schemaserver/watcher.go
b/banyand/metadata/schema/schemaserver/watcher.go
new file mode 100644
index 000000000..ab726c866
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/watcher.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "sync"
+
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const watcherChannelSize = 512
+
+type watcherManager struct {
+ l *logger.Logger
+ watchers map[uint64]chan *schemav1.WatchSchemasResponse
+ mu sync.RWMutex
+ nextID uint64
+}
+
+func newWatcherManager(l *logger.Logger) *watcherManager {
+ return &watcherManager{
+ l: l,
+ watchers: make(map[uint64]chan *schemav1.WatchSchemasResponse),
+ }
+}
+
+// Subscribe registers a new watcher and returns its ID and event channel.
+func (wm *watcherManager) Subscribe() (uint64, <-chan
*schemav1.WatchSchemasResponse) {
+ wm.mu.Lock()
+ defer wm.mu.Unlock()
+ wm.nextID++
+ id := wm.nextID
+ ch := make(chan *schemav1.WatchSchemasResponse, watcherChannelSize)
+ wm.watchers[id] = ch
+ wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers",
len(wm.watchers)).Msg("watcher subscribed")
+ return id, ch
+}
+
+// Unsubscribe removes a watcher and closes its channel.
+func (wm *watcherManager) Unsubscribe(id uint64) {
+ wm.mu.Lock()
+ defer wm.mu.Unlock()
+ if ch, ok := wm.watchers[id]; ok {
+ delete(wm.watchers, id)
+ close(ch)
+ wm.l.Debug().Uint64("watcherID", id).Int("totalWatchers",
len(wm.watchers)).Msg("watcher unsubscribed")
+ }
+}
+
+// Broadcast sends an event to all watchers without blocking.
+func (wm *watcherManager) Broadcast(resp *schemav1.WatchSchemasResponse) {
+ wm.mu.RLock()
+ defer wm.mu.RUnlock()
+ var propertyID string
+ if prop := resp.GetProperty(); prop != nil {
+ propertyID = prop.GetId()
+ }
+ wm.l.Debug().Stringer("eventType", resp.GetEventType()).
+ Str("propertyID", propertyID).
+ Int("watcherCount", len(wm.watchers)).
+ Msg("broadcasting schema event")
+ for id, ch := range wm.watchers {
+ select {
+ case ch <- resp:
+ default:
+ wm.l.Warn().Uint64("watcherID", id).
+ Stringer("eventType", resp.GetEventType()).
+ Str("propertyID", propertyID).
+ Msg("watcher channel full, dropping event")
+ }
+ }
+}
+
+// Count returns the number of active watchers.
+func (wm *watcherManager) Count() int {
+ wm.mu.RLock()
+ defer wm.mu.RUnlock()
+ return len(wm.watchers)
+}
+
+// Close removes all watchers and closes their channels.
+func (wm *watcherManager) Close() {
+ wm.mu.Lock()
+ defer wm.mu.Unlock()
+ for id, ch := range wm.watchers {
+ delete(wm.watchers, id)
+ close(ch)
+ }
+}
diff --git a/banyand/metadata/schema/schemaserver/watcher_test.go
b/banyand/metadata/schema/schemaserver/watcher_test.go
new file mode 100644
index 000000000..57991d09c
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/watcher_test.go
@@ -0,0 +1,115 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestWatcherManager_SubscribeUnsubscribe(t *testing.T) {
+ wm := newWatcherManager(logger.GetLogger("test-watcher"))
+ defer wm.Close()
+ id1, ch1 := wm.Subscribe()
+ id2, ch2 := wm.Subscribe()
+ assert.NotEqual(t, id1, id2)
+ assert.NotNil(t, ch1)
+ assert.NotNil(t, ch2)
+ wm.Unsubscribe(id1)
+ // ch1 should be closed
+ _, ok := <-ch1
+ assert.False(t, ok)
+ // ch2 should still be open
+ wm.Broadcast(&schemav1.WatchSchemasResponse{EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT})
+ select {
+ case resp := <-ch2:
+ assert.Equal(t,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT, resp.GetEventType())
+ case <-time.After(time.Second):
+ t.Fatal("expected event on ch2")
+ }
+}
+
+func TestWatcherManager_Broadcast(t *testing.T) {
+ wm := newWatcherManager(logger.GetLogger("test-watcher"))
+ defer wm.Close()
+ const numWatchers = 5
+ channels := make([]<-chan *schemav1.WatchSchemasResponse, numWatchers)
+ for i := range numWatchers {
+ _, channels[i] = wm.Subscribe()
+ }
+ resp := &schemav1.WatchSchemasResponse{EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE}
+ wm.Broadcast(resp)
+ for i, ch := range channels {
+ select {
+ case got := <-ch:
+ assert.Equal(t,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UPDATE, got.GetEventType(), "watcher
%d", i)
+ case <-time.After(time.Second):
+ t.Fatalf("watcher %d did not receive event", i)
+ }
+ }
+}
+
+func TestWatcherManager_BroadcastNonBlocking(t *testing.T) {
+ wm := newWatcherManager(logger.GetLogger("test-watcher"))
+ defer wm.Close()
+ _, ch := wm.Subscribe()
+ // Fill the channel
+ for range watcherChannelSize {
+ wm.Broadcast(&schemav1.WatchSchemasResponse{EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_INSERT})
+ }
+ // This broadcast should not block even though the channel is full
+ done := make(chan struct{})
+ go func() {
+ wm.Broadcast(&schemav1.WatchSchemasResponse{EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE})
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatal("Broadcast blocked on full channel")
+ }
+ // Drain and verify we got watcherChannelSize events
+ count := 0
+ for range watcherChannelSize {
+ select {
+ case <-ch:
+ count++
+ default:
+ }
+ }
+ assert.Equal(t, watcherChannelSize, count)
+}
+
+func TestWatcherManager_Close(t *testing.T) {
+ wm := newWatcherManager(logger.GetLogger("test-watcher"))
+ _, ch1 := wm.Subscribe()
+ _, ch2 := wm.Subscribe()
+ wm.Close()
+ _, ok1 := <-ch1
+ require.False(t, ok1, "ch1 should be closed")
+ _, ok2 := <-ch2
+ require.False(t, ok2, "ch2 should be closed")
+ // Broadcast after close should not panic
+ wm.Broadcast(&schemav1.WatchSchemasResponse{})
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 650a060bc..e208b001c 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -360,8 +360,6 @@
- [RepairService](#banyandb-property-v1-RepairService)
- [banyandb/schema/v1/internal.proto](#banyandb_schema_v1_internal-proto)
- -
[AggregateSchemaUpdatesRequest](#banyandb-schema-v1-AggregateSchemaUpdatesRequest)
- -
[AggregateSchemaUpdatesResponse](#banyandb-schema-v1-AggregateSchemaUpdatesResponse)
- [DeleteSchemaRequest](#banyandb-schema-v1-DeleteSchemaRequest)
- [DeleteSchemaResponse](#banyandb-schema-v1-DeleteSchemaResponse)
- [InsertSchemaRequest](#banyandb-schema-v1-InsertSchemaRequest)
@@ -372,6 +370,10 @@
- [RepairSchemaResponse](#banyandb-schema-v1-RepairSchemaResponse)
- [UpdateSchemaRequest](#banyandb-schema-v1-UpdateSchemaRequest)
- [UpdateSchemaResponse](#banyandb-schema-v1-UpdateSchemaResponse)
+ - [WatchSchemasRequest](#banyandb-schema-v1-WatchSchemasRequest)
+ - [WatchSchemasResponse](#banyandb-schema-v1-WatchSchemasResponse)
+
+ - [SchemaEventType](#banyandb-schema-v1-SchemaEventType)
- [SchemaManagementService](#banyandb-schema-v1-SchemaManagementService)
- [SchemaUpdateService](#banyandb-schema-v1-SchemaUpdateService)
@@ -5440,36 +5442,6 @@ WriteResponse is the response contract for write
-<a name="banyandb-schema-v1-AggregateSchemaUpdatesRequest"></a>
-
-### AggregateSchemaUpdatesRequest
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| query |
[banyandb.property.v1.QueryRequest](#banyandb-property-v1-QueryRequest) | | |
-
-
-
-
-
-
-<a name="banyandb-schema-v1-AggregateSchemaUpdatesResponse"></a>
-
-### AggregateSchemaUpdatesResponse
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| names | [string](#string) | repeated | which names/schemas has updates |
-
-
-
-
-
-
<a name="banyandb-schema-v1-DeleteSchemaRequest"></a>
### DeleteSchemaRequest
@@ -5607,8 +5579,57 @@ WriteResponse is the response contract for write
+
+<a name="banyandb-schema-v1-WatchSchemasRequest"></a>
+
+### WatchSchemasRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | | |
+| tag_projection | [string](#string) | repeated | |
+
+
+
+
+
+
+<a name="banyandb-schema-v1-WatchSchemasResponse"></a>
+
+### WatchSchemasResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| event_type | [SchemaEventType](#banyandb-schema-v1-SchemaEventType) | | |
+| property | [banyandb.property.v1.Property](#banyandb-property-v1-Property) |
| |
+| metadata_only | [bool](#bool) | | |
+| delete_time | [int64](#int64) | | delete_time is the deletion timestamp in
nanoseconds. 0 means not deleted, >0 means the property was deleted at this
time. |
+
+
+
+
+
+
+<a name="banyandb-schema-v1-SchemaEventType"></a>
+
+### SchemaEventType
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| SCHEMA_EVENT_TYPE_UNSPECIFIED | 0 | |
+| SCHEMA_EVENT_TYPE_INSERT | 1 | |
+| SCHEMA_EVENT_TYPE_UPDATE | 2 | |
+| SCHEMA_EVENT_TYPE_DELETE | 3 | |
+| SCHEMA_EVENT_TYPE_REPLAY_DONE | 4 | |
+
+
@@ -5635,7 +5656,7 @@ WriteResponse is the response contract for write
| Method Name | Request Type | Response Type | Description |
| ----------- | ------------ | ------------- | ------------|
-| AggregateSchemaUpdates |
[AggregateSchemaUpdatesRequest](#banyandb-schema-v1-AggregateSchemaUpdatesRequest)
|
[AggregateSchemaUpdatesResponse](#banyandb-schema-v1-AggregateSchemaUpdatesResponse)
| |
+| WatchSchemas |
[WatchSchemasRequest](#banyandb-schema-v1-WatchSchemasRequest) stream |
[WatchSchemasResponse](#banyandb-schema-v1-WatchSchemasResponse) stream | |
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index b053b9363..f56c38c43 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -460,7 +460,6 @@ func standaloneServerWithAuth(config *ClusterConfig, path
string, ports []int, s
ff = append(ff,
"--schema-server-grpc-host=127.0.0.1",
fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
- "--schema-property-client-sync-interval=300ms",
)
config.addSchemaServerAddr(schemaAddr)
} else {
@@ -654,7 +653,6 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
flags = append(flags,
"--schema-server-grpc-host="+nodeHost,
fmt.Sprintf("--schema-server-grpc-port=%d", schemaPort),
- "--schema-property-client-sync-interval=300ms",
)
config.addSchemaServerAddr(schemaAddr)
} else {
@@ -769,9 +767,7 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags
...string) (string, string
flags = append(flags,
fmt.Sprintf("--node-discovery-file-path=%s",
config.NodeDiscovery.FileWriter.Path()))
}
- if isPropertyMode {
- flags = append(flags, "--schema-property-client-sync-interval",
"1s")
- } else {
+ if !isPropertyMode {
flags = append(flags, "--etcd-endpoints", config.EtcdEndpoint)
}
closeFn := CMD(flags...)
diff --git a/test/e2e-v2/cases/cluster/docker-compose-property.yml
b/test/e2e-v2/cases/cluster/docker-compose-property.yml
index 6d390b226..ede0538c6 100644
--- a/test/e2e-v2/cases/cluster/docker-compose-property.yml
+++ b/test/e2e-v2/cases/cluster/docker-compose-property.yml
@@ -27,7 +27,7 @@ services:
extends:
file: ../../script/docker-compose/base-compose.yml
service: data
- command: data --etcd-endpoints=http://etcd:2379
--schema-registry-mode=property --schema-property-client-sync-interval=10s
+ command: data --etcd-endpoints=http://etcd:2379
--schema-registry-mode=property
networks:
- e2e
@@ -35,7 +35,7 @@ services:
extends:
file: ../../script/docker-compose/base-compose.yml
service: liaison
- command: liaison --etcd-endpoints=http://etcd:2379
--schema-registry-mode=property --schema-property-client-sync-interval=10s
+ command: liaison --etcd-endpoints=http://etcd:2379
--schema-registry-mode=property
networks:
- e2e
diff --git a/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
b/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
index dfaacc9d8..30bc73cb9 100644
--- a/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/event/banyandb/docker-compose-property.yml
@@ -18,7 +18,7 @@ services:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: banyandb
- command: standalone --schema-registry-mode=property
--schema-property-client-sync-interval=10s
+ command: standalone --schema-registry-mode=property
networks:
- e2e
diff --git
a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
index 3844c1405..9c6d6f74d 100644
--- a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose-property.yml
@@ -20,7 +20,7 @@ services:
extends:
file: ../../../../script/docker-compose/base-compose.yml
service: banyandb
- command: standalone --schema-registry-mode=property
--schema-property-client-sync-interval=10s
+ command: standalone --schema-registry-mode=property
networks:
- e2e
diff --git a/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
b/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
index 00f811d30..5613290fb 100644
--- a/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
+++ b/test/e2e-v2/cases/storage/banyandb/docker-compose-property.yml
@@ -20,7 +20,7 @@ services:
extends:
file: ../../../script/docker-compose/base-compose.yml
service: banyandb
- command: standalone --schema-registry-mode=property
--schema-property-client-sync-interval=10s
+ command: standalone --schema-registry-mode=property
networks:
- e2e
diff --git a/test/integration/distributed/lifecycle/property/suite_test.go
b/test/integration/distributed/lifecycle/property/suite_test.go
index dca545eb1..22058909e 100644
--- a/test/integration/distributed/lifecycle/property/suite_test.go
+++ b/test/integration/distributed/lifecycle/property/suite_test.go
@@ -91,7 +91,6 @@ func init() {
"--schema-registry-mode=property",
"--node-discovery-mode=file",
fmt.Sprintf("--node-discovery-file-path=%s",
dfWriter.Path()),
- "--schema-property-client-sync-interval=300ms",
},
StopFunc: func() {
closerLiaisonNode()