Copilot commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904714137


##########
banyand/metadata/schema/property/client.go:
##########
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
        }
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(nodeName, client.update)
+               }()
+       }
+}

Review Comment:
   `launchWatch` can be called multiple times for the same node 
(reconnects/health flaps), and each call starts a new long-lived `watchLoop` 
with no per-node lifecycle. This can lead to duplicated processing and 
goroutine/log leaks. Please deduplicate per node (e.g., map[nodeName]cancel, 
only one watch per node) and stop the watch when the node becomes 
inactive/removed or when `Execute(nodeName, ...)` reports no active client.



##########
banyand/metadata/schema/property/client_test.go:
##########
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string, 
func()) {
                }
                time.Sleep(50 * time.Millisecond)
        }
-       return addr, func() { srv.GracefulStop() }
+       return srv, addr, func() { srv.GracefulStop() }
 }

Review Comment:
   `startTestSchemaServerFull` both registers `t.Cleanup(func() { 
srv.GracefulStop() })` and returns a `stopFn` that also calls 
`srv.GracefulStop()`. Callers like `TestOnDelete_RemoveNode` invoke the 
returned `stopFn`, so the server is stopped twice, which can lead to flaky 
behavior depending on gRPC server stop idempotency. Consider removing the 
internal `t.Cleanup` when returning a stop function, or make the returned stop 
function idempotent (e.g., `sync.Once`).



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -70,6 +70,7 @@ type Server interface {
        run.Service
        GetPort() *uint32
        RegisterGossip(messenger gossip.Messenger)
+       WatcherCount() int
 }

Review Comment:
   Adding `WatcherCount()` to the exported `schemaserver.Server` interface is 
an API-breaking change for any external consumers and also introduces what 
looks like test-only introspection into a production interface (it’s only used 
in tests right now). If this is only needed for tests/diagnostics, consider 
exposing it on the concrete server type (or via a separate optional interface) 
instead of extending the main exported interface.



##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx 
context.Context, req *schemav1
                return &schemav1.DeleteSchemaResponse{Found: false}, nil
        }
        ids := make([][]byte, 0, len(results))
+       var deletedProps []*propertyv1.Property
        for _, result := range results {
                if result.DeleteTime() > 0 {
                        continue
                }
                ids = append(ids, result.ID())
+               var p propertyv1.Property
+               if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr == nil {
+                       deletedProps = append(deletedProps, &p)
+               }

Review Comment:
   In `DeleteSchema`, unmarshal errors are silently ignored 
(`protojson.Unmarshal` failures just skip adding to `deletedProps`). That means 
the server can successfully delete a schema but never broadcast a delete event 
for it, leaving watch-based clients stuck with stale cache. Consider treating 
unmarshal failures as an error (fail the delete) or at least logging and 
broadcasting a minimal delete event that still allows clients to evict by 
ID/revision.



##########
banyand/metadata/schema/property/client.go:
##########
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
        }
 }
 
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+       if r.closer.AddRunning() {
+               go func() {
+                       defer r.closer.Done()
+                       r.watchLoop(nodeName, client.update)
+               }()
+       }
+}
+
+const (
+       watchInitialBackoff = time.Second
+       watchMaxBackoff     = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient 
schemav1.SchemaUpdateServiceClient) {
+       backoff := watchInitialBackoff
+       ctx := r.closer.Ctx()
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+               err := r.consumeWatchStream(ctx, updateClient)
+               if err != nil {
+                       st, ok := status.FromError(err)
+                       if ok && st.Code() == codes.Unimplemented {
+                               r.l.Debug().Str("node", nodeName).Msg("watch: 
server does not support WatchSchemas, stopping watch")
+                               return
+                       }
+                       r.l.Warn().Err(err).Str("node", 
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff *= 2
+               if backoff > watchMaxBackoff {
+                       backoff = watchMaxBackoff
+               }
+       }

Review Comment:
   `watchLoop` retries forever using a client bound to the original gRPC 
connection. If the connection is closed and the node is removed from 
`ConnManager`, this loop will continue retrying indefinitely (it only stops 
when the whole registry closes), which is both noisy and wasteful. Tie the 
watch attempt to the current active connection (e.g., call `WatchSchemas` 
inside `connMgr.Execute(nodeName, ...)` each retry) and exit when the node is 
no longer active/registered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to