Copilot commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904714137
##########
banyand/metadata/schema/property/client.go:
##########
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
Review Comment:
`launchWatch` can be called multiple times for the same node
(reconnects/health flaps), and each call starts a new long-lived `watchLoop`
with no per-node lifecycle. This can lead to duplicated processing and
goroutine/log leaks. Please deduplicate per node (e.g., map[nodeName]cancel,
only one watch per node) and stop the watch when the node becomes
inactive/removed or when `Execute(nodeName, ...)` reports no active client.
##########
banyand/metadata/schema/property/client_test.go:
##########
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string,
func()) {
}
time.Sleep(50 * time.Millisecond)
}
- return addr, func() { srv.GracefulStop() }
+ return srv, addr, func() { srv.GracefulStop() }
}
Review Comment:
`startTestSchemaServerFull` both registers `t.Cleanup(func() {
srv.GracefulStop() })` and returns a `stopFn` that also calls
`srv.GracefulStop()`. Callers like `TestOnDelete_RemoveNode` invoke the
returned `stopFn`, so the server is stopped twice, which can lead to flaky
behavior depending on gRPC server stop idempotency. Consider removing the
internal `t.Cleanup` when returning a stop function, or make the returned stop
function idempotent (e.g., `sync.Once`).
##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -70,6 +70,7 @@ type Server interface {
run.Service
GetPort() *uint32
RegisterGossip(messenger gossip.Messenger)
+ WatcherCount() int
}
Review Comment:
Adding `WatcherCount()` to the exported `schemaserver.Server` interface is
an API-breaking change for any external consumers and also introduces what
looks like test-only introspection into a production interface (it’s only used
in tests right now). If this is only needed for tests/diagnostics, consider
exposing it on the concrete server type (or via a separate optional interface)
instead of extending the main exported interface.
##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+ var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr == nil {
+ deletedProps = append(deletedProps, &p)
+ }
Review Comment:
In `DeleteSchema`, unmarshal errors are silently ignored
(`protojson.Unmarshal` failures just skip adding to `deletedProps`). That means
the server can successfully delete a schema but never broadcast a delete event
for it, leaving watch-based clients stuck with stale cache. Consider treating
unmarshal failures as an error (fail the delete) or at least logging and
broadcasting a minimal delete event that still allows clients to evict by
ID/revision.
##########
banyand/metadata/schema/property/client.go:
##########
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
+
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient
schemav1.SchemaUpdateServiceClient) {
+ backoff := watchInitialBackoff
+ ctx := r.closer.Ctx()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ err := r.consumeWatchStream(ctx, updateClient)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
+ }
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
+ }
+ }
Review Comment:
`watchLoop` retries forever using a client bound to the original gRPC
connection. If the connection is closed and the node is removed from
`ConnManager`, this loop will continue retrying indefinitely (it only stops
when the whole registry closes), which is both noisy and wasteful. Tie the
watch attempt to the current active connection (e.g., call `WatchSchemas`
inside `connMgr.Execute(nodeName, ...)` each retry) and exit when the node is
no longer active/registered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]