mrproliu commented on code in PR #995:
URL:
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909949752
##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx
context.Context, req *schemav1
type schemaUpdateServer struct {
schemav1.UnimplementedSchemaUpdateServiceServer
- server *server
- l *logger.Logger
- metrics *serverMetrics
+ server *server
+ watchers *watcherManager
+ l *logger.Logger
+ metrics *serverMetrics
}
-// AggregateSchemaUpdates returns distinct schema names that have been
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
- req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
- s.metrics.totalStarted.Inc(1, "aggregate")
- start := time.Now()
- defer func() {
- s.metrics.totalFinished.Inc(1, "aggregate")
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision
and metadata_only).
+// The server continuously pushes live events and replays historical data on
request.
+func (s *schemaUpdateServer) WatchSchemas(
+ stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest,
schemav1.WatchSchemasResponse],
+) error {
+ id, ch := s.watchers.Subscribe()
+ defer s.watchers.Unsubscribe(id)
+
+ reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+ reqErrCh := make(chan error, 1)
+ go func() {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ reqErrCh <- err
+ close(reqCh)
+ return
+ }
+ reqCh <- req
+ }
}()
- if req.Query == nil {
- return nil, errInvalidRequest("query is required")
+
+ for {
+ select {
+ case <-stream.Context().Done():
+ return stream.Context().Err()
+ case err := <-reqErrCh:
+ return err
+ case req, ok := <-reqCh:
Review Comment:
Added the io.EOF checker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]