mrproliu commented on code in PR #995:
URL: 
https://github.com/apache/skywalking-banyandb/pull/995#discussion_r2909949752


##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -252,47 +278,115 @@ func (s *schemaManagementServer) RepairSchema(ctx 
context.Context, req *schemav1
 
 type schemaUpdateServer struct {
        schemav1.UnimplementedSchemaUpdateServiceServer
-       server  *server
-       l       *logger.Logger
-       metrics *serverMetrics
+       server   *server
+       watchers *watcherManager
+       l        *logger.Logger
+       metrics  *serverMetrics
 }
 
-// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
-func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
-       req *schemav1.AggregateSchemaUpdatesRequest,
-) (*schemav1.AggregateSchemaUpdatesResponse, error) {
-       s.metrics.totalStarted.Inc(1, "aggregate")
-       start := time.Now()
-       defer func() {
-               s.metrics.totalFinished.Inc(1, "aggregate")
-               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+// WatchSchemas implements bi-directional streaming for schema change events.
+// Clients send requests on the stream to trigger replay (with max_revision 
and metadata_only).
+// The server continuously pushes live events and replays historical data on 
request.
+func (s *schemaUpdateServer) WatchSchemas(
+       stream grpc.BidiStreamingServer[schemav1.WatchSchemasRequest, 
schemav1.WatchSchemasResponse],
+) error {
+       id, ch := s.watchers.Subscribe()
+       defer s.watchers.Unsubscribe(id)
+
+       reqCh := make(chan *schemav1.WatchSchemasRequest, 1)
+       reqErrCh := make(chan error, 1)
+       go func() {
+               for {
+                       req, err := stream.Recv()
+                       if err != nil {
+                               reqErrCh <- err
+                               close(reqCh)
+                               return
+                       }
+                       reqCh <- req
+               }
        }()
-       if req.Query == nil {
-               return nil, errInvalidRequest("query is required")
+
+       for {
+               select {
+               case <-stream.Context().Done():
+                       return stream.Context().Err()
+               case err := <-reqErrCh:
+                       return err
+               case req, ok := <-reqCh:

Review Comment:
   Added the io.EOF checker. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to