Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


hanahmily merged PR #995:
URL: https://github.com/apache/skywalking-banyandb/pull/995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909956253


##
banyand/metadata/schema/property/client.go:
##
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
-   r.performSync(ctx)
+   r.syncRound++
+   if r.syncRound%r.fullReconcileEvery == 0 {
+   r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+   r.performFullSync(ctx)
+   } else {
+   r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+   r.performIncrementalSync(ctx)
+   }
}
}
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-   round := atomic.AddUint64(&r.syncRound, 1)
-   isFullReconcile := r.shouldFullReconcile(round)
-   names := r.connMgr.ActiveNames()
-   if len(names) == 0 {
-   r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-   return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   r.watchMu.Lock()
+   defer r.watchMu.Unlock()
+   if old, exists := r.watchSessions[nodeName]; exists {
+   r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+   old.cancelFn()
+   delete(r.watchSessions, nodeName)
+   }
+   ctx, cancel := context.WithCancel(r.closer.Ctx())
+   session := &watchSession{
+   cancelFn:  cancel,
+   syncReqCh: make(chan *syncRequest, 1),
+   syncResCh: make(chan []*digestEntry, 1),
}
-   r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-   Int("activeNodes", len(names)).Msg("performSync: starting")
-   collector := newSyncCollector()
-   if isFullReconcile {
-   r.collectFullSync(ctx, collector)
+   r.watchSessions[nodeName] = session
+   r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(ctx, nodeName, client.update, session)
+   }()
} else {
-   r.collectIncrementalSync(ctx, collector)
+   cancel()
+   delete(r.watchSessions, nodeName)
}
-   r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-   r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-   kindsToSync := r.getKindsToSync()
-   if len(kindsToSync) == 0 {
-   return
-   }
-   broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-   for _, kind := range kindsToSync {
-   query := buildSchemaQuery(kind, "", "", 0)
-   schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-   if queryErr != nil {
-   return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-   }
-   if r.l.Debug().Enabled() {
-   activeSchemas := 0
-   deletedSchemas := 0
-   for _, s := range schemas {
-   if s.deleteTime > 0 {
-   deletedSchemas++
-   continue
-   }
-   activeSchemas++
-   }
-   r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-   Int("schemas", len(schemas)).
-   Int("activeSchemas", activeSchemas).
-   Int("deletedSchemas", deletedSchemas).
-   Msg("collectFullSync: collected")
-   }
-   collector.add(nodeName, kind, schemas)
-   }
-   return nil
-   })
-   if broadcastErr != nil {
-   r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+   r.watchMu.Lock()
+   defer r.watchMu.Unlock()
+   if session, exists := r.watchSessions[nodeName]; exists {
+ 

Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909954608


##
banyand/metadata/schema/property/client.go:
##
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) 
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
-   r.performSync(ctx)
+   r.syncRound++
+   if r.syncRound%r.fullReconcileEvery == 0 {
+   r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting full reconcile")
+   r.performFullSync(ctx)
+   } else {
+   r.l.Debug().Uint64("round", 
r.syncRound).Msg("syncLoop: starting incremental sync")
+   r.performIncrementalSync(ctx)
+   }
}
}
 }
 
-func (r *SchemaRegistry) performSync(ctx context.Context) {
-   round := atomic.AddUint64(&r.syncRound, 1)
-   isFullReconcile := r.shouldFullReconcile(round)
-   names := r.connMgr.ActiveNames()
-   if len(names) == 0 {
-   r.l.Debug().Uint64("round", round).Msg("performSync: no active 
connections, skipping")
-   return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   r.watchMu.Lock()
+   defer r.watchMu.Unlock()
+   if old, exists := r.watchSessions[nodeName]; exists {
+   r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing 
existing watch session")
+   old.cancelFn()
+   delete(r.watchSessions, nodeName)
+   }
+   ctx, cancel := context.WithCancel(r.closer.Ctx())
+   session := &watchSession{
+   cancelFn:  cancel,
+   syncReqCh: make(chan *syncRequest, 1),
+   syncResCh: make(chan []*digestEntry, 1),
}
-   r.l.Debug().Uint64("round", round).Bool("fullReconcile", 
isFullReconcile).
-   Int("activeNodes", len(names)).Msg("performSync: starting")
-   collector := newSyncCollector()
-   if isFullReconcile {
-   r.collectFullSync(ctx, collector)
+   r.watchSessions[nodeName] = session
+   r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch 
session")
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(ctx, nodeName, client.update, session)
+   }()
} else {
-   r.collectIncrementalSync(ctx, collector)
+   cancel()
+   delete(r.watchSessions, nodeName)
}
-   r.l.Debug().Uint64("round", round).Int("collectedKinds", 
len(collector.entries)).Msg("performSync: reconciling")
-   r.reconcileFromCollector(collector)
 }
 
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector 
*syncCollector) {
-   kindsToSync := r.getKindsToSync()
-   if len(kindsToSync) == 0 {
-   return
-   }
-   broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient) 
error {
-   for _, kind := range kindsToSync {
-   query := buildSchemaQuery(kind, "", "", 0)
-   schemas, queryErr := r.querySchemasFromClient(ctx, 
sc.management, query)
-   if queryErr != nil {
-   return fmt.Errorf("failed to query schemas for 
full sync for kind: %s", kind)
-   }
-   if r.l.Debug().Enabled() {
-   activeSchemas := 0
-   deletedSchemas := 0
-   for _, s := range schemas {
-   if s.deleteTime > 0 {
-   deletedSchemas++
-   continue
-   }
-   activeSchemas++
-   }
-   r.l.Debug().Stringer("kind", kind).Str("node", 
nodeName).
-   Int("schemas", len(schemas)).
-   Int("activeSchemas", activeSchemas).
-   Int("deletedSchemas", deletedSchemas).
-   Msg("collectFullSync: collected")
-   }
-   collector.add(nodeName, kind, schemas)
-   }
-   return nil
-   })
-   if broadcastErr != nil {
-   r.l.Error().Err(broadcastErr).Msg("failed to collect full sync 
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+   r.watchMu.Lock()
+   defer r.watchMu.Unlock()
+   if session, exists := r.watchSessions[nodeName]; exists {
+ 

Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909951174


##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
-   server  *server
-   l   *logger.Logger
-   metrics *serverMetrics
+   server   *server
+   watchers *watcherManager
+   l*logger.Logger
+   metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-   req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-   s.metrics.totalStarted.Inc(1, "aggregate")
-   start := time.Now()
-   defer func() {
-   s.metrics.totalFinished.Inc(1, "aggregate")
-   s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+   stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+   id, ch := s.watchers.Subscribe()
+   defer s.watchers.Unsubscribe(id)
+
+   reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+   reqErrCh := make(chan error, 1)
+   go func() {
+   for {
+   req, err := stream.Recv()
+   if err != nil {
+   reqErrCh <- err
+   close(reqCh)
+   return
+   }
+   reqCh <- req
+   }
}()
-   if req.Query == nil {
-   return nil, errInvalidRequest("query is required")
+
+   for {
+   select {
+   case <-stream.Context().Done():
+   return stream.Context().Err()
+   case err := <-reqErrCh:
+   return err
+   case req, ok := <-reqCh:
+   if !ok {
+   return nil
+   }
+   if replayErr := s.replaySchemas(stream, req); replayErr 
!= nil {
+   return replayErr
+   }
+   if doneErr := 
stream.Send(&schemav1.WatchSchemasResponse{
+   EventType: 
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+   }); doneErr != nil {
+   return doneErr
+   }
+   case resp, ok := <-ch:
+   if !ok {
+   return nil
+   }
+   if sendErr := stream.Send(resp); sendErr != nil {
+   return sendErr
+   }
+   }
}
-   req.Query.Groups = []string{schema.SchemaGroup}
-   results, queryErr := s.server.db.Query(ctx, req.Query)
-   if queryErr != nil {
-   s.metrics.totalErr.Inc(1, "aggregate")
-   s.l.Error().Err(queryErr).Msg("failed to aggregate schema 
updates")
-   return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+   stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+   req *schemav1.WatchSchemasRequest,
+) error {
+   metadataOnly := len(req.TagProjection) > 0
+   query := &propertyv1.QueryRequest{
+   Groups:   []string{schema.SchemaGroup},
+   Criteria: req.Criteria,
+   }
+   results, err := s.server.db.Query(stream.Context(), query)
+   if err != nil {
+   return err
}
-   nameSet := make(map[string]struct{}, len(results))
for _, result := range results {
var p propertyv1.Property
if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
-   s.metrics.totalErr.Inc(1, "aggregate")
-   return nil, unmarshalErr
+   s.l.Warn().Err(unmarshalErr).Msg("replay: failed to 
unmarshal property")
+   continue
+   }

Review Comment:
   return error when `Unmarshal` failure. 



##
banyand/metadata/schema/schemaserver/watcher.go:
##
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for add

Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909949752


##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
-   server  *server
-   l   *logger.Logger
-   metrics *serverMetrics
+   server   *server
+   watchers *watcherManager
+   l*logger.Logger
+   metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-   req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-   s.metrics.totalStarted.Inc(1, "aggregate")
-   start := time.Now()
-   defer func() {
-   s.metrics.totalFinished.Inc(1, "aggregate")
-   s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+   stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+   id, ch := s.watchers.Subscribe()
+   defer s.watchers.Unsubscribe(id)
+
+   reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+   reqErrCh := make(chan error, 1)
+   go func() {
+   for {
+   req, err := stream.Recv()
+   if err != nil {
+   reqErrCh <- err
+   close(reqCh)
+   return
+   }
+   reqCh <- req
+   }
}()
-   if req.Query == nil {
-   return nil, errInvalidRequest("query is required")
+
+   for {
+   select {
+   case <-stream.Context().Done():
+   return stream.Context().Err()
+   case err := <-reqErrCh:
+   return err
+   case req, ok := <-reqCh:

Review Comment:
   Added the io.EOF checker. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-10 Thread via GitHub


Copilot commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909877731


##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
-   server  *server
-   l   *logger.Logger
-   metrics *serverMetrics
+   server   *server
+   watchers *watcherManager
+   l*logger.Logger
+   metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-   req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-   s.metrics.totalStarted.Inc(1, "aggregate")
-   start := time.Now()
-   defer func() {
-   s.metrics.totalFinished.Inc(1, "aggregate")
-   s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+   stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+   id, ch := s.watchers.Subscribe()
+   defer s.watchers.Unsubscribe(id)
+
+   reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+   reqErrCh := make(chan error, 1)
+   go func() {
+   for {
+   req, err := stream.Recv()
+   if err != nil {
+   reqErrCh <- err
+   close(reqCh)
+   return
+   }
+   reqCh <- req
+   }
}()
-   if req.Query == nil {
-   return nil, errInvalidRequest("query is required")
+
+   for {
+   select {
+   case <-stream.Context().Done():
+   return stream.Context().Err()
+   case err := <-reqErrCh:
+   return err
+   case req, ok := <-reqCh:

Review Comment:
   In WatchSchemas, when the client closes its send direction, stream.Recv() 
will return io.EOF. The current goroutine forwards that as an error, causing 
the RPC to return io.EOF instead of a clean nil termination. Handle io.EOF 
explicitly (return nil) to avoid surfacing it as a stream error.



##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
-   server  *server
-   l   *logger.Logger
-   metrics *serverMetrics
+   server   *server
+   watchers *watcherManager
+   l*logger.Logger
+   metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-   req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-   s.metrics.totalStarted.Inc(1, "aggregate")
-   start := time.Now()
-   defer func() {
-   s.metrics.totalFinished.Inc(1, "aggregate")
-   s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+   stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+   id, ch := s.watchers.Subscribe()
+   defer s.watchers.Unsubscribe(id)
+
+   reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+   reqErrCh := make(chan error, 1)
+   go func() {
+   for {
+   req, err := stream.Recv()
+   if err != nil {
+   reqErrCh <- err
+   close(reqCh)
+   return
+   }
+   reqCh <- req
+   }
}()
-   if req.Query == nil {
-   return nil, errInvalidRequest("query is required")
+
+   for {
+   select {
+   case <-stream.Context().Done():
+   return stream.Context().Err()
+   case err := <-reqErrCh:
+   

Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


hanahmily commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2905200697


##
api/proto/banyandb/schema/v1/internal.proto:
##
@@ -85,4 +85,19 @@ message AggregateSchemaUpdatesResponse {
 
 service SchemaUpdateService {
   rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns 
(AggregateSchemaUpdatesResponse);
+  rpc WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse);

Review Comment:
   I think keeping WatchSchemas as server-streaming (rpc 
WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse)) is the 
better fit for a “watch” API here.
   
   



##
api/proto/banyandb/schema/v1/internal.proto:
##
@@ -85,4 +85,19 @@ message AggregateSchemaUpdatesResponse {
 
 service SchemaUpdateService {
   rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns 
(AggregateSchemaUpdatesResponse);
+  rpc WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse);

Review Comment:
   Can the synchronization process also utilize the streaming service? If so, 
the AggregateSchemaUpdates can be eliminated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904821682


##
banyand/metadata/schema/schemaserver/service.go:
##
@@ -70,6 +70,7 @@ type Server interface {
run.Service
GetPort() *uint32
RegisterGossip(messenger gossip.Messenger)
+   WatcherCount() int
 }

Review Comment:
   remove this function in the interface and added a func test when test 
checking. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904819286


##
banyand/metadata/schema/property/client_test.go:
##
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string, 
func()) {
}
time.Sleep(50 * time.Millisecond)
}
-   return addr, func() { srv.GracefulStop() }
+   return srv, addr, func() { srv.GracefulStop() }
 }

Review Comment:
   add a new `sync.Once` to fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904818116


##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx 
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+   var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+   var p propertyv1.Property
+   if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr == nil {
+   deletedProps = append(deletedProps, &p)
+   }

Review Comment:
   If unmarshal failure then return the error message. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904815712


##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(nodeName, client.update)
+   }()
+   }
+}
+
+const (
+   watchInitialBackoff = time.Second
+   watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient 
schemav1.SchemaUpdateServiceClient) {
+   backoff := watchInitialBackoff
+   ctx := r.closer.Ctx()
+   for {
+   select {
+   case <-ctx.Done():
+   return
+   default:
+   }
+   err := r.consumeWatchStream(ctx, updateClient)
+   if err != nil {
+   st, ok := status.FromError(err)
+   if ok && st.Code() == codes.Unimplemented {
+   r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+   return
+   }
+   r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+   }
+   select {
+   case <-ctx.Done():
+   return
+   case <-time.After(backoff):
+   }
+   backoff *= 2
+   if backoff > watchMaxBackoff {
+   backoff = watchMaxBackoff
+   }
+   }

Review Comment:
   Cancel the loop when the node connection has been shutdown. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904808733


##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(nodeName, client.update)
+   }()
+   }
+}

Review Comment:
   Add a new map to cache all the launched watchers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


codecov-commenter commented on PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#issuecomment-4022984203

   ## 
[Codecov](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   :white_check_mark: All modified and coverable lines are covered by tests.
   :white_check_mark: Project coverage is 30.04%. Comparing base 
([`3530dd9`](https://app.codecov.io/gh/apache/skywalking-banyandb/commit/3530dd9b5cefaa4fef60eec91ecbfbdfcaf243c5?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache))
 to head 
([`f156499`](https://app.codecov.io/gh/apache/skywalking-banyandb/commit/f156499500d2e0ebfd5fbc49ee1c6b2c9fee142e?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)).
   :warning: Report is 147 commits behind head on main.
   > :exclamation:  There is a different number of reports uploaded between 
BASE (3530dd9) and HEAD (f156499). Click for more details.
   > 
   > HEAD has 1 upload less than BASE
   >
   >| Flag | BASE (3530dd9) | HEAD (f156499) |
   >|--|--|--|
   >||1|0|
   >
   
   Additional details and impacted files
   
   
   
   ```diff
   @@ Coverage Diff @@
   ## main #995   +/-   ##
   ===
   - Coverage   45.97%   30.04%   -15.93% 
   ===
 Files 328  122  -206 
 Lines   5550512846-42659 
   ===
   - Hits25520 3860-21660 
   + Misses  27909 8614-19295 
   + Partials 2076  372 -1704 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[pkg](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `30.04% <ø> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:rocket: New features to boost your workflow: 
   
   - :snowflake: [Test 
Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, 
report on failures, and find test suite problems.
   - :package: [JS Bundle 
Analysis](https://docs.codecov.com/docs/javascript-bundle-analysis): Save 
yourself from yourself by tracking and limiting bundle sizes in JS merges.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support client watch the schema server updates [skywalking-banyandb]

2026-03-09 Thread via GitHub


Copilot commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904714137


##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(nodeName, client.update)
+   }()
+   }
+}

Review Comment:
   `launchWatch` can be called multiple times for the same node 
(reconnects/health flaps), and each call starts a new long-lived `watchLoop` 
with no per-node lifecycle. This can lead to duplicated processing and 
goroutine/log leaks. Please deduplicate per node (e.g., map[nodeName]cancel, 
only one watch per node) and stop the watch when the node becomes 
inactive/removed or when `Execute(nodeName, ...)` reports no active client.



##
banyand/metadata/schema/property/client_test.go:
##
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string, 
func()) {
}
time.Sleep(50 * time.Millisecond)
}
-   return addr, func() { srv.GracefulStop() }
+   return srv, addr, func() { srv.GracefulStop() }
 }

Review Comment:
   `startTestSchemaServerFull` both registers `t.Cleanup(func() { 
srv.GracefulStop() })` and returns a `stopFn` that also calls 
`srv.GracefulStop()`. Callers like `TestOnDelete_RemoveNode` invoke the 
returned `stopFn`, so the server is stopped twice, which can lead to flaky 
behavior depending on gRPC server stop idempotency. Consider removing the 
internal `t.Cleanup` when returning a stop function, or make the returned stop 
function idempotent (e.g., `sync.Once`).



##
banyand/metadata/schema/schemaserver/service.go:
##
@@ -70,6 +70,7 @@ type Server interface {
run.Service
GetPort() *uint32
RegisterGossip(messenger gossip.Messenger)
+   WatcherCount() int
 }

Review Comment:
   Adding `WatcherCount()` to the exported `schemaserver.Server` interface is 
an API-breaking change for any external consumers and also introduces what 
looks like test-only introspection into a production interface (it’s only used 
in tests right now). If this is only needed for tests/diagnostics, consider 
exposing it on the concrete server type (or via a separate optional interface) 
instead of extending the main exported interface.



##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx 
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+   var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+   var p propertyv1.Property
+   if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr == nil {
+   deletedProps = append(deletedProps, &p)
+   }

Review Comment:
   In `DeleteSchema`, unmarshal errors are silently ignored 
(`protojson.Unmarshal` failures just skip adding to `deletedProps`). That means 
the server can successfully delete a schema but never broadcast a delete event 
for it, leaving watch-based clients stuck with stale cache. Consider treating 
unmarshal failures as an error (fail the delete) or at least logging and 
broadcasting a minimal delete event that still allows clients to evict by 
ID/revision.



##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+   if r.closer.AddRunning() {
+   go func() {
+   defer r.closer.Done()
+   r.watchLoop(nodeName, client.update)
+   }()
+   }
+}
+
+const (
+   watchInitialBackoff = time.Second
+   watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient 
schemav1.SchemaUpdateServiceClient) {
+   backoff := watchInitialBackoff
+   ctx := r.closer.Ctx()
+   for {
+   select {
+   case <-ctx.Done():
+   return
+   default:
+   }
+   err := r.consumeWatchStream(ctx, updateClient)
+   if err != nil {
+   st, ok := status.FromError(err)
+   if ok && st.Code() == codes.Unimplemented {
+   r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not sup