Copilot commented on code in PR #971:
URL: 
https://github.com/apache/skywalking-banyandb/pull/971#discussion_r2793015468


##########
banyand/property/gossip/service.go:
##########
@@ -238,7 +238,7 @@ func (s *service) Serve(stopCh chan struct{}) {
                lis, err := net.Listen("tcp", s.addr)
                if err != nil {
                        s.log.Error().Err(err).Msg("Failed to listen")
-                       close(stopCh)
+                       closer.CloseThenWait()
                        return
                }

Review Comment:
   On `net.Listen` failure, the goroutine returns without calling `wg.Done()`, 
so the second goroutine (`wg.Wait()`) will block forever. Ensure `wg.Done()` is 
called on all exit paths (e.g., `defer wg.Done()` at goroutine start) and avoid 
leaking goroutines.



##########
banyand/metadata/embeddedserver/server.go:
##########
@@ -30,20 +30,30 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       
"github.com/apache/skywalking-banyandb/banyand/metadata/schema/propertyserver"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+var (
+       schemaTypeEtcd     = "embededetcd"

Review Comment:
   The default schema storage type constant is spelled `embededetcd`, but the 
flag help text says `embedetcd`. This mismatch is user-facing and makes it easy 
to pass a value that won’t match the intended option. Align the constant and 
the help string (and consider validating unknown values).
   ```suggestion
        schemaTypeEtcd     = "embedetcd"
   ```



##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+               Limit:  1,
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists: name=%s, 
id=%s", req.Property.Metadata.Name, req.Property.Id)
+               }
+       }
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(updateErr).Msg("failed to insert schema")
+               return nil, updateErr
+       }
+       return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req 
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "update")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "update")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"update")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "update")
+               s.l.Error().Err(updateErr).Msg("failed to update schema")
+               return nil, updateErr
+       }
+       return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+       stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+       if req.Query == nil {
+               return errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "list")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "list")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "list")
+               s.l.Error().Err(queryErr).Msg("failed to list schemas")
+               return queryErr
+       }
+       for batchStart := 0; batchStart < len(results); batchStart += 
listSchemasBatchSize {
+               batchEnd := batchStart + listSchemasBatchSize
+               if batchEnd > len(results) {
+                       batchEnd = len(results)
+               }
+               batch := results[batchStart:batchEnd]
+               props := make([]*propertyv1.Property, 0, len(batch))
+               deleteTimes := make([]int64, 0, len(batch))
+               for _, result := range batch {
+                       var p propertyv1.Property
+                       if unmarshalErr := protojson.Unmarshal(result.Source(), 
&p); unmarshalErr != nil {
+                               s.metrics.totalErr.Inc(1, "list")
+                               return unmarshalErr
+                       }
+                       props = append(props, &p)
+                       deleteTimes = append(deleteTimes, result.DeleteTime())
+               }
+               if sendErr := 
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: 
deleteTimes}); sendErr != nil {
+                       s.metrics.totalErr.Inc(1, "list")
+                       return sendErr
+               }
+       }
+       return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req 
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+       if req.Delete == nil {
+               return nil, errInvalidRequest("delete request is required")
+       }
+       s.metrics.totalStarted.Inc(1, "delete")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "delete")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"delete")
+       }()
+       query := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Delete.Name,
+       }
+       if req.Delete.Id != "" {
+               query.Ids = []string{req.Delete.Id}
+       }
+       results, queryErr := s.server.db.Query(ctx, query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(queryErr).Msg("failed to delete schema")
+               return nil, queryErr
+       }
+       if len(results) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       ids := make([][]byte, 0, len(results))
+       for _, result := range results {
+               if result.DeleteTime() > 0 {
+                       continue
+               }
+               ids = append(ids, result.ID())
+       }
+       if len(ids) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       if deleteErr := s.server.db.Delete(ctx, ids); deleteErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+               return nil, deleteErr
+       }
+       return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req 
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "repair")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "repair")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"repair")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), 
req.Property, req.DeleteTime); repairErr != nil {
+               s.metrics.totalErr.Inc(1, "repair")
+               s.l.Error().Err(repairErr).Msg("failed to repair schema")
+               return nil, repairErr
+       }
+       return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req 
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "exist")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "exist")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       req.Query.Limit = 1
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "exist")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil
+}
+
+type schemaUpdateServer struct {
+       schemav1.UnimplementedSchemaUpdateServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
+func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
+       req *schemav1.AggregateSchemaUpdatesRequest,
+) (*schemav1.AggregateSchemaUpdatesResponse, error) {
+       s.metrics.totalStarted.Inc(1, "aggregate")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "aggregate")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+       }()
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "aggregate")
+               s.l.Error().Err(queryErr).Msg("failed to aggregate schema 
updates")
+               return nil, queryErr
+       }
+       nameSet := make(map[string]struct{}, len(results))
+       for _, result := range results {
+               var p propertyv1.Property
+               if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
+                       s.metrics.totalErr.Inc(1, "aggregate")
+                       return nil, unmarshalErr
+               }
+               if p.Metadata != nil && p.Metadata.Name != "" {
+                       nameSet[p.Metadata.Name] = struct{}{}
+               }
+       }
+       names := make([]string, 0, len(nameSet))
+       for name := range nameSet {
+               names = append(names, name)
+       }
+       return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+}
+
+func errInvalidRequest(msg string) error {
+       return &invalidRequestError{msg: msg}
+}
+
+type invalidRequestError struct {
+       msg string
+}
+
+func (e *invalidRequestError) Error() string {
+       return "invalid request: " + e.msg
+}

Review Comment:
   Request validation errors are returned as plain Go errors (e.g., `invalid 
request: ...`), which will surface to gRPC clients as `codes.Unknown`. Convert 
these to `status.Error(codes.InvalidArgument, ...)` (or map this custom error 
type in an interceptor) so clients can reliably distinguish bad input from 
server failures.



##########
banyand/metadata/schema/propertyserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyserver implements a standalone gRPC server for schema 
property management.
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone property server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone property server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "property-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-property-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-property-server-root-path", "/tmp", 
"root storage path")
+       flagS.StringVar(&s.host, "schema-property-server-grpc-host", "", "the 
host of schema property server")
+       flagS.Uint32Var(&s.port, "schema-property-server-grpc-port", 17916, 
"the port of schema property server")
+       flagS.DurationVar(&s.flushTimeout, 
"schema-property-server-flush-timeout", 5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-property-server-expire-delete-timeout", time.Hour*24*7, "soft-delete 
expiration")
+       flagS.BoolVar(&s.tls, "schema-property-server-tls", false, "connection 
uses TLS if true")
+       flagS.StringVar(&s.certFile, "schema-property-server-cert-file", "", 
"the TLS cert file")
+       flagS.StringVar(&s.keyFile, "schema-property-server-key-file", "", "the 
TLS key file")
+       flagS.VarP(&s.maxRecvMsgSize, 
"schema-property-server-max-recv-msg-size", "", "max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-property-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-property-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-property-server-repair-quick-build-tree-time",
+               s.repairQuickBuildTreeTime, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-property-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-property-server-min-file-snapshot-age", time.Hour, "the minimum age for 
file snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920
+       }
+       s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
+       if s.addr == ":" {
+               return errors.New("no address")
+       }
+       if !s.tls {
+               return nil
+       }
+       if s.certFile == "" {
+               return errors.New("invalid server cert file")
+       }
+       if s.keyFile == "" {
+               return errors.New("invalid server key file")
+       }
+       return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+       s.l = logger.GetLogger("schema-property-server")
+       s.lfs = fs.NewLocalFileSystem()
+
+       grpcFactory := s.omr.With(propertyServerScope.SubScope("grpc"))
+       sm := newServerMetrics(grpcFactory)
+       s.schemaService = &schemaManagementServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+       s.updateService = &schemaUpdateServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+
+       if s.tls {
+               var tlsErr error
+               s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, 
s.keyFile, s.l)
+               if tlsErr != nil {
+                       return errors.Wrap(tlsErr, "failed to initialize TLS 
reloader for property server")
+               }
+       }
+
+       dataDir := filepath.Join(s.root, "schema-property", "data")
+       snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+       repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+       cfg := db.Config{
+               Location:               dataDir,
+               FlushInterval:          s.flushTimeout,
+               ExpireToDeleteDuration: s.expireTimeout,
+               Repair: db.RepairConfig{
+                       Enabled:            true,
+                       Location:           repairDir,
+                       BuildTreeCron:      s.repairBuildTreeCron,
+                       QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+                       TreeSlotCount:      s.repairTreeSlotCount,
+               },
+               Index: db.IndexConfig{
+                       BatchWaitSec:       5,
+                       WaitForPersistence: false,
+               },
+               Snapshot: db.SnapshotConfig{
+                       Location: snapshotDir,
+                       Func: func(ctx context.Context) (string, error) {
+                               s.snapshotMu.Lock()
+                               defer s.snapshotMu.Unlock()
+                               storage.DeleteStaleSnapshots(snapshotDir, 
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+                               sn := s.snapshotName()
+                               snapshot := s.db.TakeSnapShot(ctx, sn)
+                               if snapshot.Error != "" {
+                                       return "", fmt.Errorf("failed to find 
snapshot %s: %s", sn, snapshot.Error)
+                               }
+                               return path.Join(snapshotDir, sn, 
storage.DataDir), nil
+                       },
+               },
+       }
+
+       var openErr error
+       // nolint:contextcheck
+       s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs)
+       if openErr != nil {
+               return errors.Wrap(openErr, "failed to open property database")
+       }
+       s.l.Info().Str("root", s.root).Msg("property database initialized")
+       return nil
+}
+
+func (s *server) snapshotName() string {
+       s.snapshotSeq++
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
+}
+
+func (s *server) Serve() run.StopNotify {
+       var opts []grpclib.ServerOption
+       if s.tls {
+               if s.tlsReloader != nil {
+                       if startErr := s.tlsReloader.Start(); startErr != nil {
+                               s.l.Error().Err(startErr).Msg("Failed to start 
TLS reloader for property server")
+                               return s.closer.CloseNotify()
+                       }
+                       s.l.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).
+                               Msg("Started TLS file monitoring for property 
server")
+                       tlsConfig := s.tlsReloader.GetTLSConfig()
+                       creds := credentials.NewTLS(tlsConfig)
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               } else {
+                       creds, tlsErr := 
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+                       if tlsErr != nil {
+                               s.l.Error().Err(tlsErr).Msg("Failed to load TLS 
credentials")
+                               return s.closer.CloseNotify()
+                       }
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               }
+       }
+       grpcPanicRecoveryHandler := func(p any) (err error) {
+               s.l.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
+               return status.Errorf(codes.Internal, "%s", p)
+       }
+       streamChain := []grpclib.StreamServerInterceptor{
+               
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       unaryChain := []grpclib.UnaryServerInterceptor{
+               grpc_validator.UnaryServerInterceptor(),
+               
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
+               grpclib.ChainUnaryInterceptor(unaryChain...),
+               grpclib.ChainStreamInterceptor(streamChain...),
+       )
+       s.ser = grpclib.NewServer(opts...)
+       schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService)
+       schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService)
+       grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
+
+       s.closer.AddRunning()
+       go func() {
+               defer s.closer.Done()
+               lis, lisErr := net.Listen("tcp", s.addr)
+               if lisErr != nil {
+                       s.l.Error().Err(lisErr).Msg("Failed to listen")

Review Comment:
   If `net.Listen` fails, `Serve()` logs and returns without closing the 
server's `Closer`. Since `Serve()` returns `s.closer.CloseNotify()`, callers 
may block forever waiting for shutdown. Close the closer (and/or return a stop 
notify that is closed) when the listener cannot be created.
   ```suggestion
                        s.l.Error().Err(lisErr).Msg("Failed to listen")
                        s.closer.Close()
   ```



##########
banyand/metadata/schema/propertyserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyserver implements a standalone gRPC server for schema 
property management.
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone property server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone property server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "property-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-property-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-property-server-root-path", "/tmp", 
"root storage path")
+       flagS.StringVar(&s.host, "schema-property-server-grpc-host", "", "the 
host of schema property server")
+       flagS.Uint32Var(&s.port, "schema-property-server-grpc-port", 17916, 
"the port of schema property server")
+       flagS.DurationVar(&s.flushTimeout, 
"schema-property-server-flush-timeout", 5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-property-server-expire-delete-timeout", time.Hour*24*7, "soft-delete 
expiration")
+       flagS.BoolVar(&s.tls, "schema-property-server-tls", false, "connection 
uses TLS if true")
+       flagS.StringVar(&s.certFile, "schema-property-server-cert-file", "", 
"the TLS cert file")
+       flagS.StringVar(&s.keyFile, "schema-property-server-key-file", "", "the 
TLS key file")
+       flagS.VarP(&s.maxRecvMsgSize, 
"schema-property-server-max-recv-msg-size", "", "max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-property-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-property-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-property-server-repair-quick-build-tree-time",
+               s.repairQuickBuildTreeTime, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-property-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")

Review Comment:
   `schema-property-server-repair-quick-build-tree-time` is registered with a 
default of `s.repairQuickBuildTreeTime`, which is 0 unless set elsewhere. With 
a 0 duration, repair build-tree can be triggered immediately on the first 
update, which differs from the property service default (10m) and may cause 
unexpected load. Set an explicit non-zero default here (or document why 0 is 
desired).



##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{

Review Comment:
   The server persists schema `Property` records without setting 
`Metadata.CreateRevision`/`Metadata.ModRevision` (and `UpdatedAt`). Since the 
property DB uses `Metadata.ModRevision` as the document timestamp and as part 
of `GetPropertyID`, leaving it at 0 makes all revisions indistinguishable and 
undermines update semantics. Consider assigning `CreateRevision` on first 
insert, updating `ModRevision` (and `UpdatedAt`) on every insert/update/repair, 
similar to the existing property Apply flow.



##########
banyand/property/scheduler.go:
##########
@@ -0,0 +1,113 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "math/rand/v2"
+       "time"
+
+       "github.com/robfig/cron/v3"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+type repairScheduler struct {
+       metadata        metadata.Repo
+       gossipMessenger gossip.Messenger
+       scheduler       *timestamp.Scheduler
+}
+
+func newRepairScheduler(ctx context.Context, l *logger.Logger, cronExp string, 
metadata metadata.Repo, messenger gossip.Messenger) (*repairScheduler, error) {
+       r := &repairScheduler{
+               metadata:        metadata,
+               gossipMessenger: messenger,
+               scheduler:       timestamp.NewScheduler(l, 
timestamp.NewClock()),
+       }
+       err := r.scheduler.Register("trigger", 
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+               cronExp, func(time.Time, *logger.Logger) bool {
+                       l.Debug().Msgf("starting background repair gossip")
+                       group, shardNum, nodes, gossipErr := 
r.doRepairGossip(ctx)
+                       if gossipErr != nil {
+                               l.Err(gossipErr).Msg("failed to repair gossip")
+                               return true
+                       }
+                       l.Info().Str("group", group).Uint32("shardNum", 
shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled")
+                       return true
+               })
+       if err != nil {
+               return nil, fmt.Errorf("failed to add repair trigger cron task: 
%w", err)
+       }
+       if err != nil {
+               return nil, fmt.Errorf("failed to add repair build tree cron 
task: %w", err)
+       }

Review Comment:
   `newRepairScheduler` checks `err` twice (the second `if err != nil` is 
unreachable and returns the wrong message). Remove the duplicate check; if 
there was meant to be a second cron registration, add it explicitly and check 
its returned error instead.
   ```suggestion
   
   ```



##########
api/proto/banyandb/schema/v1/internal.proto:
##########
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.schema.v1;
+
+import "banyandb/property/v1/property.proto";
+import "banyandb/property/v1/rpc.proto";
+import "google/protobuf/timestamp.proto";
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1";
+option java_package = "org.apache.skywalking.banyandb.schema.v1";
+
+message InsertSchemaRequest {
+  property.v1.Property property = 1;
+}
+
+message InsertSchemaResponse {}
+
+message UpdateSchemaRequest {
+  property.v1.Property property = 1;
+}
+
+message UpdateSchemaResponse {}
+
+message ListSchemasRequest {
+  property.v1.QueryRequest query = 1;
+}
+
+message ListSchemasResponse {
+  repeated property.v1.Property properties = 1;
+  // delete_times maps to properties in the same order.
+  // 0 means not deleted, >0 means deletion timestamp.
+  repeated int64 delete_times = 2;
+}
+
+message GetSchemaRequest {
+  property.v1.QueryRequest query = 1;
+}
+
+message GetSchemaResponse {
+  property.v1.Property properties = 1;

Review Comment:
   `GetSchemaResponse` defines a singular `Property` field but names it 
`properties`. This is confusing and will be part of the public proto contract 
once released. Rename the field to `property` (or make it `repeated` if 
multiple are intended) before consumers depend on it.
   ```suggestion
     property.v1.Property property = 1;
   ```



##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+               Limit:  1,
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists: name=%s, 
id=%s", req.Property.Metadata.Name, req.Property.Id)
+               }
+       }
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(updateErr).Msg("failed to insert schema")
+               return nil, updateErr
+       }
+       return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req 
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "update")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "update")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"update")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "update")
+               s.l.Error().Err(updateErr).Msg("failed to update schema")
+               return nil, updateErr
+       }
+       return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+       stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+       if req.Query == nil {
+               return errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "list")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "list")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "list")
+               s.l.Error().Err(queryErr).Msg("failed to list schemas")
+               return queryErr
+       }
+       for batchStart := 0; batchStart < len(results); batchStart += 
listSchemasBatchSize {
+               batchEnd := batchStart + listSchemasBatchSize
+               if batchEnd > len(results) {
+                       batchEnd = len(results)
+               }
+               batch := results[batchStart:batchEnd]
+               props := make([]*propertyv1.Property, 0, len(batch))
+               deleteTimes := make([]int64, 0, len(batch))
+               for _, result := range batch {
+                       var p propertyv1.Property
+                       if unmarshalErr := protojson.Unmarshal(result.Source(), 
&p); unmarshalErr != nil {
+                               s.metrics.totalErr.Inc(1, "list")
+                               return unmarshalErr
+                       }
+                       props = append(props, &p)
+                       deleteTimes = append(deleteTimes, result.DeleteTime())
+               }
+               if sendErr := 
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: 
deleteTimes}); sendErr != nil {
+                       s.metrics.totalErr.Inc(1, "list")
+                       return sendErr
+               }
+       }
+       return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req 
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+       if req.Delete == nil {
+               return nil, errInvalidRequest("delete request is required")
+       }
+       s.metrics.totalStarted.Inc(1, "delete")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "delete")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"delete")
+       }()
+       query := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Delete.Name,
+       }
+       if req.Delete.Id != "" {
+               query.Ids = []string{req.Delete.Id}
+       }
+       results, queryErr := s.server.db.Query(ctx, query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(queryErr).Msg("failed to delete schema")
+               return nil, queryErr
+       }
+       if len(results) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       ids := make([][]byte, 0, len(results))
+       for _, result := range results {
+               if result.DeleteTime() > 0 {
+                       continue
+               }
+               ids = append(ids, result.ID())
+       }
+       if len(ids) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       if deleteErr := s.server.db.Delete(ctx, ids); deleteErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+               return nil, deleteErr
+       }
+       return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req 
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "repair")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "repair")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"repair")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), 
req.Property, req.DeleteTime); repairErr != nil {
+               s.metrics.totalErr.Inc(1, "repair")
+               s.l.Error().Err(repairErr).Msg("failed to repair schema")
+               return nil, repairErr
+       }
+       return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req 
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "exist")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "exist")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       req.Query.Limit = 1
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "exist")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil

Review Comment:
   `ExistSchema` sets `Limit = 1` and returns `HasSchema: len(results) > 0`, 
which can incorrectly report existence for soft-deleted schemas and can miss 
active schemas if the single returned row happens to be a deleted revision. 
Scan results for at least one row with `DeleteTime()==0` and avoid `Limit=1` 
unless you can guarantee ordering that prefers non-deleted/latest revisions.
   ```suggestion
        results, queryErr := s.server.db.Query(ctx, req.Query)
        if queryErr != nil {
                s.metrics.totalErr.Inc(1, "exist")
                s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
                return nil, queryErr
        }
        hasSchema := false
        for _, result := range results {
                var p propertyv1.Property
                if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
                        s.metrics.totalErr.Inc(1, "exist")
                        return nil, unmarshalErr
                }
                // Consider the schema as existing only if it is not 
soft-deleted.
                if p.GetStatus().GetDeleteTime() == 0 {
                        hasSchema = true
                        break
                }
        }
        return &schemav1.ExistSchemaResponse{HasSchema: hasSchema}, nil
   ```



##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+               Limit:  1,
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists: name=%s, 
id=%s", req.Property.Metadata.Name, req.Property.Id)
+               }
+       }
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(updateErr).Msg("failed to insert schema")
+               return nil, updateErr
+       }
+       return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req 
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "update")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "update")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"update")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "update")
+               s.l.Error().Err(updateErr).Msg("failed to update schema")
+               return nil, updateErr
+       }
+       return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+       stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+       if req.Query == nil {
+               return errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "list")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "list")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "list")
+               s.l.Error().Err(queryErr).Msg("failed to list schemas")
+               return queryErr
+       }
+       for batchStart := 0; batchStart < len(results); batchStart += 
listSchemasBatchSize {
+               batchEnd := batchStart + listSchemasBatchSize
+               if batchEnd > len(results) {
+                       batchEnd = len(results)
+               }
+               batch := results[batchStart:batchEnd]
+               props := make([]*propertyv1.Property, 0, len(batch))
+               deleteTimes := make([]int64, 0, len(batch))
+               for _, result := range batch {
+                       var p propertyv1.Property
+                       if unmarshalErr := protojson.Unmarshal(result.Source(), 
&p); unmarshalErr != nil {
+                               s.metrics.totalErr.Inc(1, "list")
+                               return unmarshalErr
+                       }
+                       props = append(props, &p)
+                       deleteTimes = append(deleteTimes, result.DeleteTime())
+               }
+               if sendErr := 
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: 
deleteTimes}); sendErr != nil {
+                       s.metrics.totalErr.Inc(1, "list")
+                       return sendErr
+               }
+       }
+       return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req 
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+       if req.Delete == nil {
+               return nil, errInvalidRequest("delete request is required")
+       }
+       s.metrics.totalStarted.Inc(1, "delete")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "delete")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"delete")
+       }()
+       query := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Delete.Name,
+       }
+       if req.Delete.Id != "" {
+               query.Ids = []string{req.Delete.Id}
+       }
+       results, queryErr := s.server.db.Query(ctx, query)
+       if queryErr != nil {

Review Comment:
   `DeleteSchemaRequest` includes `update_at` (comment says it's for 
notification), but `DeleteSchema` currently ignores it and does not update the 
stored record's revision/time accordingly. If downstream consumers rely on this 
for change notification, wire `update_at` into the delete marker (e.g., set 
`ModRevision`/`UpdatedAt` when marking deleted).



##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+               Limit:  1,
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists: name=%s, 
id=%s", req.Property.Metadata.Name, req.Property.Id)
+               }

Review Comment:
   `InsertSchema` checks for duplicates with `Limit: 1`. The underlying 
`Search` results are not guaranteed to be ordered (callers sort explicitly 
elsewhere), so a single returned row could be a deleted revision while an 
active (non-deleted) revision exists, allowing duplicates. Query without such a 
limit (or ensure deterministic ordering) and treat the schema as existing if 
*any* matching row has `DeleteTime()==0`. Also consider returning a gRPC 
`AlreadyExists` status rather than a plain `fmt.Errorf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to