Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
hanahmily merged PR #995: URL: https://github.com/apache/skywalking-banyandb/pull/995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909956253
##
banyand/metadata/schema/property/client.go:
##
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909954608
##
banyand/metadata/schema/property/client.go:
##
@@ -1063,260 +994,345 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context)
{
case <-r.closer.CloseNotify():
return
case <-ticker.C:
- r.performSync(ctx)
+ r.syncRound++
+ if r.syncRound%r.fullReconcileEvery == 0 {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting full reconcile")
+ r.performFullSync(ctx)
+ } else {
+ r.l.Debug().Uint64("round",
r.syncRound).Msg("syncLoop: starting incremental sync")
+ r.performIncrementalSync(ctx)
+ }
}
}
}
-func (r *SchemaRegistry) performSync(ctx context.Context) {
- round := atomic.AddUint64(&r.syncRound, 1)
- isFullReconcile := r.shouldFullReconcile(round)
- names := r.connMgr.ActiveNames()
- if len(names) == 0 {
- r.l.Debug().Uint64("round", round).Msg("performSync: no active
connections, skipping")
- return
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if old, exists := r.watchSessions[nodeName]; exists {
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: closing
existing watch session")
+ old.cancelFn()
+ delete(r.watchSessions, nodeName)
+ }
+ ctx, cancel := context.WithCancel(r.closer.Ctx())
+ session := &watchSession{
+ cancelFn: cancel,
+ syncReqCh: make(chan *syncRequest, 1),
+ syncResCh: make(chan []*digestEntry, 1),
}
- r.l.Debug().Uint64("round", round).Bool("fullReconcile",
isFullReconcile).
- Int("activeNodes", len(names)).Msg("performSync: starting")
- collector := newSyncCollector()
- if isFullReconcile {
- r.collectFullSync(ctx, collector)
+ r.watchSessions[nodeName] = session
+ r.l.Debug().Str("node", nodeName).Msg("launchWatch: starting new watch
session")
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(ctx, nodeName, client.update, session)
+ }()
} else {
- r.collectIncrementalSync(ctx, collector)
+ cancel()
+ delete(r.watchSessions, nodeName)
}
- r.l.Debug().Uint64("round", round).Int("collectedKinds",
len(collector.entries)).Msg("performSync: reconciling")
- r.reconcileFromCollector(collector)
}
-func (r *SchemaRegistry) collectFullSync(ctx context.Context, collector
*syncCollector) {
- kindsToSync := r.getKindsToSync()
- if len(kindsToSync) == 0 {
- return
- }
- broadcastErr := r.broadcastAll(func(nodeName string, sc *schemaClient)
error {
- for _, kind := range kindsToSync {
- query := buildSchemaQuery(kind, "", "", 0)
- schemas, queryErr := r.querySchemasFromClient(ctx,
sc.management, query)
- if queryErr != nil {
- return fmt.Errorf("failed to query schemas for
full sync for kind: %s", kind)
- }
- if r.l.Debug().Enabled() {
- activeSchemas := 0
- deletedSchemas := 0
- for _, s := range schemas {
- if s.deleteTime > 0 {
- deletedSchemas++
- continue
- }
- activeSchemas++
- }
- r.l.Debug().Stringer("kind", kind).Str("node",
nodeName).
- Int("schemas", len(schemas)).
- Int("activeSchemas", activeSchemas).
- Int("deletedSchemas", deletedSchemas).
- Msg("collectFullSync: collected")
- }
- collector.add(nodeName, kind, schemas)
- }
- return nil
- })
- if broadcastErr != nil {
- r.l.Error().Err(broadcastErr).Msg("failed to collect full sync
from some nodes")
+func (r *SchemaRegistry) stopWatch(nodeName string) {
+ r.watchMu.Lock()
+ defer r.watchMu.Unlock()
+ if session, exists := r.watchSessions[nodeName]; exists {
+
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909951174
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l*logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
+ if !ok {
+ return nil
+ }
+ if replayErr := s.replaySchemas(stream, req); replayErr
!= nil {
+ return replayErr
+ }
+ if doneErr :=
stream.Send(&schemav1.WatchSchemasResponse{
+ EventType:
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
+ }); doneErr != nil {
+ return doneErr
+ }
+ case resp, ok := <-ch:
+ if !ok {
+ return nil
+ }
+ if sendErr := stream.Send(resp); sendErr != nil {
+ return sendErr
+ }
+ }
}
- req.Query.Groups = []string{schema.SchemaGroup}
- results, queryErr := s.server.db.Query(ctx, req.Query)
- if queryErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- s.l.Error().Err(queryErr).Msg("failed to aggregate schema
updates")
- return nil, queryErr
+}
+
+func (s *schemaUpdateServer) replaySchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+ req *schemav1.WatchSchemasRequest,
+) error {
+ metadataOnly := len(req.TagProjection) > 0
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schema.SchemaGroup},
+ Criteria: req.Criteria,
+ }
+ results, err := s.server.db.Query(stream.Context(), query)
+ if err != nil {
+ return err
}
- nameSet := make(map[string]struct{}, len(results))
for _, result := range results {
var p propertyv1.Property
if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
- s.metrics.totalErr.Inc(1, "aggregate")
- return nil, unmarshalErr
+ s.l.Warn().Err(unmarshalErr).Msg("replay: failed to
unmarshal property")
+ continue
+ }
Review Comment:
return error when `Unmarshal` failure.
##
banyand/metadata/schema/schemaserver/watcher.go:
##
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for add
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909949752
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l*logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
Review Comment:
Added the io.EOF checker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
Copilot commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909877731
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l*logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
Review Comment:
In WatchSchemas, when the client closes its send direction, stream.Recv()
will return io.EOF. The current goroutine forwards that as an error, causing
the RPC to return io.EOF instead of a clean nil termination. Handle io.EOF
explicitly (return nil) to avoid surfacing it as a stream error.
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l*logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
hanahmily commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2905200697
##
api/proto/banyandb/schema/v1/internal.proto:
##
@@ -85,4 +85,19 @@ message AggregateSchemaUpdatesResponse {
service SchemaUpdateService {
rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns
(AggregateSchemaUpdatesResponse);
+ rpc WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse);
Review Comment:
I think keeping WatchSchemas as server-streaming (rpc
WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse)) is the
better fit for a “watch” API here.
##
api/proto/banyandb/schema/v1/internal.proto:
##
@@ -85,4 +85,19 @@ message AggregateSchemaUpdatesResponse {
service SchemaUpdateService {
rpc AggregateSchemaUpdates(AggregateSchemaUpdatesRequest) returns
(AggregateSchemaUpdatesResponse);
+ rpc WatchSchemas(WatchSchemasRequest) returns (stream WatchSchemasResponse);
Review Comment:
Can the synchronization process also utilize the streaming service? If so,
the AggregateSchemaUpdates can be eliminated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904821682
##
banyand/metadata/schema/schemaserver/service.go:
##
@@ -70,6 +70,7 @@ type Server interface {
run.Service
GetPort() *uint32
RegisterGossip(messenger gossip.Messenger)
+ WatcherCount() int
}
Review Comment:
remove this function in the interface and added a func test when test
checking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904819286
##
banyand/metadata/schema/property/client_test.go:
##
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string,
func()) {
}
time.Sleep(50 * time.Millisecond)
}
- return addr, func() { srv.GracefulStop() }
+ return srv, addr, func() { srv.GracefulStop() }
}
Review Comment:
add a new `sync.Once` to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904818116
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+ var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr == nil {
+ deletedProps = append(deletedProps, &p)
+ }
Review Comment:
If unmarshal failure then return the error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904815712
##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
+
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient
schemav1.SchemaUpdateServiceClient) {
+ backoff := watchInitialBackoff
+ ctx := r.closer.Ctx()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ err := r.consumeWatchStream(ctx, updateClient)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not support WatchSchemas, stopping watch")
+ return
+ }
+ r.l.Warn().Err(err).Str("node",
nodeName).Dur("backoff", backoff).Msg("watch stream disconnected, will retry")
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ }
+ backoff *= 2
+ if backoff > watchMaxBackoff {
+ backoff = watchMaxBackoff
+ }
+ }
Review Comment:
Cancel the loop when the node connection has been shutdown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904808733
##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
Review Comment:
Add a new map to cache all the launched watchers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
codecov-commenter commented on PR #995: URL: https://github.com/apache/skywalking-banyandb/pull/995#issuecomment-4022984203 ## [Codecov](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report :white_check_mark: All modified and coverable lines are covered by tests. :white_check_mark: Project coverage is 30.04%. Comparing base ([`3530dd9`](https://app.codecov.io/gh/apache/skywalking-banyandb/commit/3530dd9b5cefaa4fef60eec91ecbfbdfcaf243c5?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)) to head ([`f156499`](https://app.codecov.io/gh/apache/skywalking-banyandb/commit/f156499500d2e0ebfd5fbc49ee1c6b2c9fee142e?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)). :warning: Report is 147 commits behind head on main. > :exclamation: There is a different number of reports uploaded between BASE (3530dd9) and HEAD (f156499). Click for more details. > > HEAD has 1 upload less than BASE > >| Flag | BASE (3530dd9) | HEAD (f156499) | >|--|--|--| >||1|0| > Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #995 +/- ## === - Coverage 45.97% 30.04% -15.93% === Files 328 122 -206 Lines 5550512846-42659 === - Hits25520 3860-21660 + Misses 27909 8614-19295 + Partials 2076 372 -1704 ``` | [Flag](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [pkg](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `30.04% <ø> (?)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/skywalking-banyandb/pull/995?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :rocket: New features to boost your workflow: - :snowflake: [Test Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, report on failures, and find test suite problems. - :package: [JS Bundle Analysis](https://docs.codecov.com/docs/javascript-bundle-analysis): Save yourself from yourself by tracking and limiting bundle sizes in JS merges. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Support client watch the schema server updates [skywalking-banyandb]
Copilot commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2904714137
##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
Review Comment:
`launchWatch` can be called multiple times for the same node
(reconnects/health flaps), and each call starts a new long-lived `watchLoop`
with no per-node lifecycle. This can lead to duplicated processing and
goroutine/log leaks. Please deduplicate per node (e.g., map[nodeName]cancel,
only one watch per node) and stop the watch when the node becomes
inactive/removed or when `Execute(nodeName, ...)` reports no active client.
##
banyand/metadata/schema/property/client_test.go:
##
@@ -81,7 +87,7 @@ func startTestSchemaServerStoppable(t *testing.T) (string,
func()) {
}
time.Sleep(50 * time.Millisecond)
}
- return addr, func() { srv.GracefulStop() }
+ return srv, addr, func() { srv.GracefulStop() }
}
Review Comment:
`startTestSchemaServerFull` both registers `t.Cleanup(func() {
srv.GracefulStop() })` and returns a `stopFn` that also calls
`srv.GracefulStop()`. Callers like `TestOnDelete_RemoveNode` invoke the
returned `stopFn`, so the server is stopped twice, which can lead to flaky
behavior depending on gRPC server stop idempotency. Consider removing the
internal `t.Cleanup` when returning a stop function, or make the returned stop
function idempotent (e.g., `sync.Once`).
##
banyand/metadata/schema/schemaserver/service.go:
##
@@ -70,6 +70,7 @@ type Server interface {
run.Service
GetPort() *uint32
RegisterGossip(messenger gossip.Messenger)
+ WatcherCount() int
}
Review Comment:
Adding `WatcherCount()` to the exported `schemaserver.Server` interface is
an API-breaking change for any external consumers and also introduces what
looks like test-only introspection into a production interface (it’s only used
in tests right now). If this is only needed for tests/diagnostics, consider
exposing it on the concrete server type (or via a separate optional interface)
instead of extending the main exported interface.
##
banyand/metadata/schema/schemaserver/grpc.go:
##
@@ -206,11 +215,16 @@ func (s *schemaManagementServer) DeleteSchema(ctx
context.Context, req *schemav1
return &schemav1.DeleteSchemaResponse{Found: false}, nil
}
ids := make([][]byte, 0, len(results))
+ var deletedProps []*propertyv1.Property
for _, result := range results {
if result.DeleteTime() > 0 {
continue
}
ids = append(ids, result.ID())
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr == nil {
+ deletedProps = append(deletedProps, &p)
+ }
Review Comment:
In `DeleteSchema`, unmarshal errors are silently ignored
(`protojson.Unmarshal` failures just skip adding to `deletedProps`). That means
the server can successfully delete a schema but never broadcast a delete event
for it, leaving watch-based clients stuck with stale cache. Consider treating
unmarshal failures as an error (fail the delete) or at least logging and
broadcasting a minimal delete event that still allows clients to evict by
ID/revision.
##
banyand/metadata/schema/property/client.go:
##
@@ -1068,6 +1069,102 @@ func (r *SchemaRegistry) syncLoop(ctx context.Context) {
}
}
+func (r *SchemaRegistry) launchWatch(nodeName string, client *schemaClient) {
+ if r.closer.AddRunning() {
+ go func() {
+ defer r.closer.Done()
+ r.watchLoop(nodeName, client.update)
+ }()
+ }
+}
+
+const (
+ watchInitialBackoff = time.Second
+ watchMaxBackoff = 30 * time.Second
+)
+
+func (r *SchemaRegistry) watchLoop(nodeName string, updateClient
schemav1.SchemaUpdateServiceClient) {
+ backoff := watchInitialBackoff
+ ctx := r.closer.Ctx()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ err := r.consumeWatchStream(ctx, updateClient)
+ if err != nil {
+ st, ok := status.FromError(err)
+ if ok && st.Code() == codes.Unimplemented {
+ r.l.Debug().Str("node", nodeName).Msg("watch:
server does not sup
