Copilot commented on code in PR #971:
URL: 
https://github.com/apache/skywalking-banyandb/pull/971#discussion_r2796374791


##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -0,0 +1,328 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var schemaServerScope = observability.RootScope.SubScope("schema_server")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")

Review Comment:
   The validation error message for `InsertSchema` says "mod_revision should be 
set for update", which is confusing in the insert path. Consider adjusting the 
wording (and reusing the same wording consistently across insert/update/repair) 
to reflect the operation accurately.
   ```suggestion
                return nil, errInvalidRequest("mod_revision should be set")
   ```



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schemaserver implements a standalone gRPC server for schema 
property management.
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone schema server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone schema server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "schema-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root 
storage path")
+       flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of 
schema server")
+       flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of 
schema server")
+       flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 
5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration")
+       flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS 
if true")
+       flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS 
cert file")
+       flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key 
file")
+       flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", 
"max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-server-repair-quick-build-tree-time",
+               time.Minute*10, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920
+       }
+       s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
+       if s.addr == ":" {
+               return errors.New("no address")
+       }
+       if !s.tls {
+               return nil
+       }
+       if s.certFile == "" {
+               return errors.New("invalid server cert file")
+       }
+       if s.keyFile == "" {
+               return errors.New("invalid server key file")
+       }
+       return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+       s.l = logger.GetLogger("schema-server")
+       s.lfs = fs.NewLocalFileSystem()
+
+       grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
+       sm := newServerMetrics(grpcFactory)
+       s.schemaService = &schemaManagementServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+       s.updateService = &schemaUpdateServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+
+       if s.tls {
+               var tlsErr error
+               s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, 
s.keyFile, s.l)
+               if tlsErr != nil {
+                       return errors.Wrap(tlsErr, "failed to initialize TLS 
reloader for server")
+               }
+       }
+
+       dataDir := filepath.Join(s.root, "schema-property", "data")
+       snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+       repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+       cfg := db.Config{
+               Location:               dataDir,
+               FlushInterval:          s.flushTimeout,
+               ExpireToDeleteDuration: s.expireTimeout,
+               Repair: db.RepairConfig{
+                       Enabled:            true,
+                       Location:           repairDir,
+                       BuildTreeCron:      s.repairBuildTreeCron,
+                       QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+                       TreeSlotCount:      s.repairTreeSlotCount,
+               },
+               Index: db.IndexConfig{
+                       BatchWaitSec:       5,
+                       WaitForPersistence: false,
+               },
+               Snapshot: db.SnapshotConfig{
+                       Location: snapshotDir,
+                       Func: func(ctx context.Context) (string, error) {
+                               s.snapshotMu.Lock()
+                               defer s.snapshotMu.Unlock()
+                               storage.DeleteStaleSnapshots(snapshotDir, 
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+                               sn := s.snapshotName()
+                               snapshot := s.db.TakeSnapShot(ctx, sn)
+                               if snapshot.Error != "" {
+                                       return "", fmt.Errorf("failed to find 
snapshot %s: %s", sn, snapshot.Error)
+                               }
+                               return path.Join(snapshotDir, sn, 
storage.DataDir), nil
+                       },
+               },
+       }
+
+       var openErr error
+       // nolint:contextcheck
+       s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs)
+       if openErr != nil {
+               return errors.Wrap(openErr, "failed to open property database")
+       }
+       s.l.Info().Str("root", s.root).Msg("schema property database 
initialized")
+       return nil
+}
+
+func (s *server) snapshotName() string {
+       s.snapshotSeq++
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
+}
+
+func (s *server) Serve() run.StopNotify {
+       var opts []grpclib.ServerOption
+       if s.tls {
+               if s.tlsReloader != nil {
+                       if startErr := s.tlsReloader.Start(); startErr != nil {
+                               s.l.Error().Err(startErr).Msg("Failed to start 
TLS reloader for schema server")
+                               return s.closer.CloseNotify()
+                       }
+                       s.l.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).
+                               Msg("Started TLS file monitoring for schema 
server")
+                       tlsConfig := s.tlsReloader.GetTLSConfig()
+                       creds := credentials.NewTLS(tlsConfig)
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               } else {
+                       creds, tlsErr := 
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+                       if tlsErr != nil {
+                               s.l.Error().Err(tlsErr).Msg("Failed to load TLS 
credentials")
+                               return s.closer.CloseNotify()
+                       }
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               }
+       }
+       grpcPanicRecoveryHandler := func(p any) (err error) {
+               s.l.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
+               return status.Errorf(codes.Internal, "%s", p)
+       }
+       streamChain := []grpclib.StreamServerInterceptor{
+               
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       unaryChain := []grpclib.UnaryServerInterceptor{
+               grpc_validator.UnaryServerInterceptor(),
+               
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
+               grpclib.ChainUnaryInterceptor(unaryChain...),
+               grpclib.ChainStreamInterceptor(streamChain...),
+       )
+       s.ser = grpclib.NewServer(opts...)
+       schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService)
+       schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService)
+       grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
+
+       s.closer.AddRunning()
+       go func() {
+               defer s.closer.Done()
+               lis, lisErr := net.Listen("tcp", s.addr)
+               if lisErr != nil {
+                       s.l.Error().Err(lisErr).Msg("Failed to listen")
+                       return
+               }

Review Comment:
   On `net.Listen` failure, the goroutine returns without canceling/closing 
`s.closer`, so `Serve()` continues to return a StopNotify channel that never 
closes. Consider signaling failure by closing/canceling the closer (ensuring 
you don’t deadlock on the running counter), so callers don’t block indefinitely.



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schemaserver implements a standalone gRPC server for schema 
property management.
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone schema server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone schema server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "schema-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root 
storage path")
+       flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of 
schema server")
+       flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of 
schema server")
+       flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 
5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration")
+       flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS 
if true")
+       flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS 
cert file")
+       flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key 
file")
+       flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", 
"max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-server-repair-quick-build-tree-time",
+               time.Minute*10, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920
+       }
+       s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
+       if s.addr == ":" {
+               return errors.New("no address")
+       }
+       if !s.tls {
+               return nil
+       }
+       if s.certFile == "" {
+               return errors.New("invalid server cert file")
+       }
+       if s.keyFile == "" {
+               return errors.New("invalid server key file")
+       }
+       return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+       s.l = logger.GetLogger("schema-server")
+       s.lfs = fs.NewLocalFileSystem()
+
+       grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
+       sm := newServerMetrics(grpcFactory)
+       s.schemaService = &schemaManagementServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+       s.updateService = &schemaUpdateServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+
+       if s.tls {
+               var tlsErr error
+               s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, 
s.keyFile, s.l)
+               if tlsErr != nil {
+                       return errors.Wrap(tlsErr, "failed to initialize TLS 
reloader for server")
+               }
+       }
+
+       dataDir := filepath.Join(s.root, "schema-property", "data")
+       snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+       repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+       cfg := db.Config{
+               Location:               dataDir,
+               FlushInterval:          s.flushTimeout,
+               ExpireToDeleteDuration: s.expireTimeout,
+               Repair: db.RepairConfig{
+                       Enabled:            true,
+                       Location:           repairDir,
+                       BuildTreeCron:      s.repairBuildTreeCron,
+                       QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+                       TreeSlotCount:      s.repairTreeSlotCount,
+               },
+               Index: db.IndexConfig{
+                       BatchWaitSec:       5,
+                       WaitForPersistence: false,
+               },
+               Snapshot: db.SnapshotConfig{
+                       Location: snapshotDir,
+                       Func: func(ctx context.Context) (string, error) {
+                               s.snapshotMu.Lock()
+                               defer s.snapshotMu.Unlock()
+                               storage.DeleteStaleSnapshots(snapshotDir, 
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+                               sn := s.snapshotName()
+                               snapshot := s.db.TakeSnapShot(ctx, sn)
+                               if snapshot.Error != "" {
+                                       return "", fmt.Errorf("failed to find 
snapshot %s: %s", sn, snapshot.Error)
+                               }
+                               return path.Join(snapshotDir, sn, 
storage.DataDir), nil
+                       },

Review Comment:
   `path.Join(...)` is used to build filesystem paths under `snapshotDir`. On 
non-Unix platforms this can produce incorrect separators; the rest of the 
function uses `filepath.Join`. Prefer `filepath.Join` here as well for 
consistency and OS-correct path handling.



##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -0,0 +1,328 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var schemaServerScope = observability.RootScope.SubScope("schema_server")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists")
+               }
+       }
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(updateErr).Msg("failed to insert schema")
+               return nil, updateErr
+       }
+       return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req 
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "update")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "update")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"update")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "update")
+               s.l.Error().Err(updateErr).Msg("failed to update schema")
+               return nil, updateErr
+       }
+       return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+       stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+       if req.Query == nil {
+               return errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "list")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "list")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "list")
+               s.l.Error().Err(queryErr).Msg("failed to list schemas")
+               return queryErr
+       }
+       for batchStart := 0; batchStart < len(results); batchStart += 
listSchemasBatchSize {
+               batchEnd := batchStart + listSchemasBatchSize
+               if batchEnd > len(results) {
+                       batchEnd = len(results)
+               }
+               batch := results[batchStart:batchEnd]
+               props := make([]*propertyv1.Property, 0, len(batch))
+               deleteTimes := make([]int64, 0, len(batch))
+               for _, result := range batch {
+                       var p propertyv1.Property
+                       if unmarshalErr := protojson.Unmarshal(result.Source(), 
&p); unmarshalErr != nil {
+                               s.metrics.totalErr.Inc(1, "list")
+                               return unmarshalErr
+                       }
+                       props = append(props, &p)
+                       deleteTimes = append(deleteTimes, result.DeleteTime())
+               }
+               if sendErr := 
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: 
deleteTimes}); sendErr != nil {
+                       s.metrics.totalErr.Inc(1, "list")
+                       return sendErr
+               }
+       }
+       return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req 
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+       if req.Delete == nil || req.UpdateAt == nil {
+               return nil, errInvalidRequest("delete request is required")
+       }
+       s.metrics.totalStarted.Inc(1, "delete")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "delete")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"delete")
+       }()
+       query := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Delete.Name,
+       }
+       if req.Delete.Id != "" {
+               query.Ids = []string{req.Delete.Id}
+       }
+       results, queryErr := s.server.db.Query(ctx, query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(queryErr).Msg("failed to delete schema")
+               return nil, queryErr
+       }
+       if len(results) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       ids := make([][]byte, 0, len(results))
+       for _, result := range results {
+               if result.DeleteTime() > 0 {
+                       continue
+               }
+               ids = append(ids, result.ID())
+       }
+       if len(ids) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       if deleteErr := s.server.db.Delete(ctx, ids, req.UpdateAt.AsTime()); 
deleteErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+               return nil, deleteErr
+       }
+       return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req 
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "repair")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "repair")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"repair")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), 
req.Property, req.DeleteTime); repairErr != nil {
+               s.metrics.totalErr.Inc(1, "repair")
+               s.l.Error().Err(repairErr).Msg("failed to repair schema")
+               return nil, repairErr
+       }
+       return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req 
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "exist")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "exist")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       req.Query.Limit = 1
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "exist")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil
+}
+
+type schemaUpdateServer struct {
+       schemav1.UnimplementedSchemaUpdateServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// AggregateSchemaUpdates returns distinct schema names that have been 
modified.
+func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
+       req *schemav1.AggregateSchemaUpdatesRequest,
+) (*schemav1.AggregateSchemaUpdatesResponse, error) {
+       s.metrics.totalStarted.Inc(1, "aggregate")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "aggregate")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"aggregate")
+       }()
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "aggregate")
+               s.l.Error().Err(queryErr).Msg("failed to aggregate schema 
updates")
+               return nil, queryErr
+       }
+       nameSet := make(map[string]struct{}, len(results))
+       for _, result := range results {
+               var p propertyv1.Property
+               if unmarshalErr := protojson.Unmarshal(result.Source(), &p); 
unmarshalErr != nil {
+                       s.metrics.totalErr.Inc(1, "aggregate")
+                       return nil, unmarshalErr
+               }
+               if p.Metadata != nil && p.Metadata.Name != "" {
+                       nameSet[p.Metadata.Name] = struct{}{}
+               }
+       }
+       names := make([]string, 0, len(nameSet))
+       for name := range nameSet {
+               names = append(names, name)
+       }
+       return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+}
+
+func errInvalidRequest(msg string) error {
+       return &invalidRequestError{msg: msg}
+}
+
+type invalidRequestError struct {
+       msg string
+}
+
+func (e *invalidRequestError) Error() string {
+       return "invalid request: " + e.msg
+}

Review Comment:
   `errInvalidRequest(...)` returns a plain Go error, so callers will receive 
gRPC code `Unknown` instead of `InvalidArgument`. Consider returning 
`status.Error(codes.InvalidArgument, ...)` (or implementing `GRPCStatus()` on 
the custom error) and similarly use appropriate status codes like 
`AlreadyExists` for duplicate inserts.



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schemaserver implements a standalone gRPC server for schema 
property management.
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone schema server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone schema server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "schema-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root 
storage path")
+       flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of 
schema server")
+       flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of 
schema server")
+       flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 
5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration")
+       flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS 
if true")
+       flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS 
cert file")
+       flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key 
file")
+       flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", 
"max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-server-repair-quick-build-tree-time",
+               time.Minute*10, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920
+       }
+       s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
+       if s.addr == ":" {
+               return errors.New("no address")
+       }
+       if !s.tls {
+               return nil
+       }
+       if s.certFile == "" {
+               return errors.New("invalid server cert file")
+       }
+       if s.keyFile == "" {
+               return errors.New("invalid server key file")
+       }
+       return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+       s.l = logger.GetLogger("schema-server")
+       s.lfs = fs.NewLocalFileSystem()
+
+       grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
+       sm := newServerMetrics(grpcFactory)
+       s.schemaService = &schemaManagementServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+       s.updateService = &schemaUpdateServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+
+       if s.tls {
+               var tlsErr error
+               s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, 
s.keyFile, s.l)
+               if tlsErr != nil {
+                       return errors.Wrap(tlsErr, "failed to initialize TLS 
reloader for server")
+               }
+       }
+
+       dataDir := filepath.Join(s.root, "schema-property", "data")
+       snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+       repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+       cfg := db.Config{
+               Location:               dataDir,
+               FlushInterval:          s.flushTimeout,
+               ExpireToDeleteDuration: s.expireTimeout,
+               Repair: db.RepairConfig{
+                       Enabled:            true,
+                       Location:           repairDir,
+                       BuildTreeCron:      s.repairBuildTreeCron,
+                       QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+                       TreeSlotCount:      s.repairTreeSlotCount,
+               },
+               Index: db.IndexConfig{
+                       BatchWaitSec:       5,
+                       WaitForPersistence: false,
+               },
+               Snapshot: db.SnapshotConfig{
+                       Location: snapshotDir,
+                       Func: func(ctx context.Context) (string, error) {
+                               s.snapshotMu.Lock()
+                               defer s.snapshotMu.Unlock()
+                               storage.DeleteStaleSnapshots(snapshotDir, 
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+                               sn := s.snapshotName()
+                               snapshot := s.db.TakeSnapShot(ctx, sn)
+                               if snapshot.Error != "" {
+                                       return "", fmt.Errorf("failed to find 
snapshot %s: %s", sn, snapshot.Error)
+                               }
+                               return path.Join(snapshotDir, sn, 
storage.DataDir), nil
+                       },
+               },
+       }
+
+       var openErr error
+       // nolint:contextcheck
+       s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs)
+       if openErr != nil {
+               return errors.Wrap(openErr, "failed to open property database")
+       }
+       s.l.Info().Str("root", s.root).Msg("schema property database 
initialized")
+       return nil
+}
+
+func (s *server) snapshotName() string {
+       s.snapshotSeq++
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
+}
+
+func (s *server) Serve() run.StopNotify {
+       var opts []grpclib.ServerOption
+       if s.tls {
+               if s.tlsReloader != nil {
+                       if startErr := s.tlsReloader.Start(); startErr != nil {
+                               s.l.Error().Err(startErr).Msg("Failed to start 
TLS reloader for schema server")
+                               return s.closer.CloseNotify()
+                       }
+                       s.l.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).
+                               Msg("Started TLS file monitoring for schema 
server")
+                       tlsConfig := s.tlsReloader.GetTLSConfig()
+                       creds := credentials.NewTLS(tlsConfig)
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               } else {
+                       creds, tlsErr := 
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+                       if tlsErr != nil {
+                               s.l.Error().Err(tlsErr).Msg("Failed to load TLS 
credentials")
+                               return s.closer.CloseNotify()
+                       }
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               }
+       }
+       grpcPanicRecoveryHandler := func(p any) (err error) {
+               s.l.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
+               return status.Errorf(codes.Internal, "%s", p)
+       }
+       streamChain := []grpclib.StreamServerInterceptor{
+               
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       unaryChain := []grpclib.UnaryServerInterceptor{
+               grpc_validator.UnaryServerInterceptor(),
+               
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+       }
+       opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
+               grpclib.ChainUnaryInterceptor(unaryChain...),
+               grpclib.ChainStreamInterceptor(streamChain...),
+       )
+       s.ser = grpclib.NewServer(opts...)
+       schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService)
+       schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService)
+       grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
+
+       s.closer.AddRunning()
+       go func() {
+               defer s.closer.Done()
+               lis, lisErr := net.Listen("tcp", s.addr)
+               if lisErr != nil {
+                       s.l.Error().Err(lisErr).Msg("Failed to listen")
+                       return
+               }
+               s.l.Info().Str("addr", s.addr).Msg("Listening to")
+               serveErr := s.ser.Serve(lis)
+               if serveErr != nil {
+                       s.l.Error().Err(serveErr).Msg("server is interrupted")
+               }
+       }()
+       return s.closer.CloseNotify()
+}
+
+func (s *server) GracefulStop() {
+       if s.tlsReloader != nil {
+               s.tlsReloader.Stop()
+       }
+       stopped := make(chan struct{})
+       go func() {
+               s.ser.GracefulStop()
+               close(stopped)
+       }()
+       t := time.NewTimer(10 * time.Second)
+       select {
+       case <-t.C:
+               s.ser.Stop()
+               s.l.Info().Msg("force stopped")
+       case <-stopped:
+               t.Stop()
+               s.l.Info().Msg("stopped gracefully")
+       }
+       if s.db != nil {
+               if closeErr := s.db.Close(); closeErr != nil {
+                       s.l.Error().Err(closeErr).Msg("failed to close 
database")
+               }
+       }
+       s.closer.CloseThenWait()

Review Comment:
   `GracefulStop()` calls `s.ser.GracefulStop()` unconditionally; if `Serve()` 
was never called (or failed before initializing `s.ser`), this will panic. Add 
a nil check for `s.ser` (and possibly `s.closer`) before stopping.
   ```suggestion
        if s == nil {
                return
        }
        if s.tlsReloader != nil {
                s.tlsReloader.Stop()
        }
        if s.ser != nil {
                stopped := make(chan struct{})
                go func() {
                        s.ser.GracefulStop()
                        close(stopped)
                }()
                t := time.NewTimer(10 * time.Second)
                select {
                case <-t.C:
                        s.ser.Stop()
                        s.l.Info().Msg("force stopped")
                case <-stopped:
                        t.Stop()
                        s.l.Info().Msg("stopped gracefully")
                }
        }
        if s.db != nil {
                if closeErr := s.db.Close(); closeErr != nil {
                        s.l.Error().Err(closeErr).Msg("failed to close 
database")
                }
        }
        if s.closer != nil {
                s.closer.CloseThenWait()
        }
   ```



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schemaserver implements a standalone gRPC server for schema 
property management.
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone schema server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone schema server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "schema-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root 
storage path")
+       flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of 
schema server")
+       flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of 
schema server")
+       flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 
5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration")
+       flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS 
if true")
+       flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS 
cert file")
+       flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key 
file")
+       flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", 
"max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-server-repair-quick-build-tree-time",
+               time.Minute*10, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920

Review Comment:
   The default port correction in `Validate()` sets `s.port = 17920`, which 
doesn’t match the default flag value (17916) and looks like a typo. This can 
lead to surprising behavior if the port is explicitly set to 0. Align the 
fallback with the documented/default port.
   ```suggestion
                s.port = 17916
   ```



##########
banyand/property/scheduler.go:
##########
@@ -0,0 +1,110 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "math/rand/v2"
+       "time"
+
+       "github.com/robfig/cron/v3"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+type repairScheduler struct {
+       metadata        metadata.Repo
+       gossipMessenger gossip.Messenger
+       scheduler       *timestamp.Scheduler
+}
+
+func newRepairScheduler(ctx context.Context, l *logger.Logger, cronExp string, 
metadata metadata.Repo, messenger gossip.Messenger) (*repairScheduler, error) {
+       r := &repairScheduler{
+               metadata:        metadata,
+               gossipMessenger: messenger,
+               scheduler:       timestamp.NewScheduler(l, 
timestamp.NewClock()),
+       }
+       err := r.scheduler.Register("trigger", 
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+               cronExp, func(time.Time, *logger.Logger) bool {
+                       l.Debug().Msgf("starting background repair gossip")
+                       group, shardNum, nodes, gossipErr := 
r.doRepairGossip(ctx)
+                       if gossipErr != nil {
+                               l.Err(gossipErr).Msg("failed to repair gossip")
+                               return true
+                       }
+                       l.Info().Str("group", group).Uint32("shardNum", 
shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled")
+                       return true
+               })
+       if err != nil {
+               return nil, fmt.Errorf("failed to add repair build tree cron 
task: %w", err)
+       }

Review Comment:
   The returned error says "failed to add repair build tree cron task", but 
this scheduler registers the "trigger" task for background repair gossip. 
Update the message to match the actual task to avoid misleading logs.



##########
banyand/metadata/schema/schemaserver/grpc.go:
##########
@@ -0,0 +1,328 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var schemaServerScope = observability.RootScope.SubScope("schema_server")
+
+type serverMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+       return &serverMetrics{
+               totalStarted:  factory.NewCounter("total_started", "method"),
+               totalFinished: factory.NewCounter("total_finished", "method"),
+               totalErr:      factory.NewCounter("total_err", "method"),
+               totalLatency:  factory.NewCounter("total_latency", "method"),
+       }
+}
+
+type schemaManagementServer struct {
+       schemav1.UnimplementedSchemaManagementServiceServer
+       server  *server
+       l       *logger.Logger
+       metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req 
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "insert")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "insert")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"insert")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       existQuery := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Property.Metadata.Name,
+               Ids:    []string{req.Property.Id},
+       }
+       existing, queryErr := s.server.db.Query(ctx, existQuery)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       for _, result := range existing {
+               if result.DeleteTime() == 0 {
+                       s.metrics.totalErr.Inc(1, "insert")
+                       return nil, fmt.Errorf("schema already exists")
+               }
+       }
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "insert")
+               s.l.Error().Err(updateErr).Msg("failed to insert schema")
+               return nil, updateErr
+       }
+       return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req 
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "update")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "update")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"update")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if updateErr := s.server.db.Update(ctx, defaultShardID, id, 
req.Property); updateErr != nil {
+               s.metrics.totalErr.Inc(1, "update")
+               s.l.Error().Err(updateErr).Msg("failed to update schema")
+               return nil, updateErr
+       }
+       return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+       stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+       if req.Query == nil {
+               return errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "list")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "list")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "list")
+               s.l.Error().Err(queryErr).Msg("failed to list schemas")
+               return queryErr
+       }
+       for batchStart := 0; batchStart < len(results); batchStart += 
listSchemasBatchSize {
+               batchEnd := batchStart + listSchemasBatchSize
+               if batchEnd > len(results) {
+                       batchEnd = len(results)
+               }
+               batch := results[batchStart:batchEnd]
+               props := make([]*propertyv1.Property, 0, len(batch))
+               deleteTimes := make([]int64, 0, len(batch))
+               for _, result := range batch {
+                       var p propertyv1.Property
+                       if unmarshalErr := protojson.Unmarshal(result.Source(), 
&p); unmarshalErr != nil {
+                               s.metrics.totalErr.Inc(1, "list")
+                               return unmarshalErr
+                       }
+                       props = append(props, &p)
+                       deleteTimes = append(deleteTimes, result.DeleteTime())
+               }
+               if sendErr := 
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: 
deleteTimes}); sendErr != nil {
+                       s.metrics.totalErr.Inc(1, "list")
+                       return sendErr
+               }
+       }
+       return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req 
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+       if req.Delete == nil || req.UpdateAt == nil {
+               return nil, errInvalidRequest("delete request is required")
+       }
+       s.metrics.totalStarted.Inc(1, "delete")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "delete")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"delete")
+       }()
+       query := &propertyv1.QueryRequest{
+               Groups: []string{schemaGroup},
+               Name:   req.Delete.Name,
+       }
+       if req.Delete.Id != "" {
+               query.Ids = []string{req.Delete.Id}
+       }
+       results, queryErr := s.server.db.Query(ctx, query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(queryErr).Msg("failed to delete schema")
+               return nil, queryErr
+       }
+       if len(results) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       ids := make([][]byte, 0, len(results))
+       for _, result := range results {
+               if result.DeleteTime() > 0 {
+                       continue
+               }
+               ids = append(ids, result.ID())
+       }
+       if len(ids) == 0 {
+               return &schemav1.DeleteSchemaResponse{Found: false}, nil
+       }
+       if deleteErr := s.server.db.Delete(ctx, ids, req.UpdateAt.AsTime()); 
deleteErr != nil {
+               s.metrics.totalErr.Inc(1, "delete")
+               s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+               return nil, deleteErr
+       }
+       return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req 
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+       if req.Property == nil {
+               return nil, errInvalidRequest("property is required")
+       }
+       if req.Property.Metadata == nil {
+               return nil, errInvalidRequest("metadata should not be nil")
+       }
+       if req.Property.Metadata.ModRevision == 0 {
+               return nil, errInvalidRequest("mod_revision should be set for 
update")
+       }
+       s.metrics.totalStarted.Inc(1, "repair")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "repair")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
"repair")
+       }()
+       req.Property.Metadata.Group = schemaGroup
+       id := db.GetPropertyID(req.Property)
+       if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), 
req.Property, req.DeleteTime); repairErr != nil {
+               s.metrics.totalErr.Inc(1, "repair")
+               s.l.Error().Err(repairErr).Msg("failed to repair schema")
+               return nil, repairErr
+       }
+       return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req 
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+       if req.Query == nil {
+               return nil, errInvalidRequest("query is required")
+       }
+       s.metrics.totalStarted.Inc(1, "exist")
+       start := time.Now()
+       defer func() {
+               s.metrics.totalFinished.Inc(1, "exist")
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+       }()
+       req.Query.Groups = []string{schemaGroup}
+       req.Query.Limit = 1
+       results, queryErr := s.server.db.Query(ctx, req.Query)
+       if queryErr != nil {
+               s.metrics.totalErr.Inc(1, "exist")
+               s.l.Error().Err(queryErr).Msg("failed to check schema 
existence")
+               return nil, queryErr
+       }
+       return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil

Review Comment:
   `ExistSchema` currently returns `HasSchema=true` if *any* document matches, 
including soft-deleted ones (since it only checks `len(results) > 0`). This 
makes existence checks incorrect after deletion. Filter results by 
`DeleteTime()==0` (and consider querying with a limit that still allows finding 
a non-deleted entry).
   ```suggestion
        hasSchema := false
        for _, r := range results {
                if r.DeleteTime() == 0 {
                        hasSchema = true
                        break
                }
        }
        return &schemav1.ExistSchemaResponse{HasSchema: hasSchema}, nil
   ```



##########
banyand/metadata/schema/schemaserver/grpc_test.go:
##########
@@ -0,0 +1,430 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "net"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+func startTestServer(t *testing.T) (schemav1.SchemaManagementServiceClient, 
schemav1.SchemaUpdateServiceClient) {
+       t.Helper()
+       srv := NewServer(observability.BypassRegistry).(*server)
+       flagSet := srv.FlagSet()
+       require.NoError(t, flagSet.Parse([]string{}))
+       srv.root = t.TempDir()
+       srv.host = "127.0.0.1"
+       srv.port = getFreePort(t)
+       require.NoError(t, srv.Validate())
+       require.NoError(t, srv.PreRun(context.Background()))
+       srv.Serve()
+       t.Cleanup(func() { srv.GracefulStop() })
+       time.Sleep(100 * time.Millisecond)

Review Comment:
   `startTestServer` uses `time.Sleep(100 * time.Millisecond)` to wait for 
server readiness, which can be flaky under load/CI. Consider waiting 
deterministically (e.g., poll the gRPC health service or attempt to dial with 
retries until success / context deadline).
   ```suggestion
   
        // Wait deterministically for the server to start accepting connections.
        deadline := time.Now().Add(5 * time.Second)
        for {
                conn, err := net.DialTimeout("tcp", srv.addr, 
100*time.Millisecond)
                if err == nil {
                        _ = conn.Close()
                        break
                }
                if time.Now().After(deadline) {
                        require.FailNowf(t, "server did not start listening in 
time", "last error: %v", err)
                }
                time.Sleep(50 * time.Millisecond)
        }
   ```



##########
banyand/metadata/schema/schemaserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schemaserver implements a standalone gRPC server for schema 
property management.
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "runtime/debug"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+       grpc_validator 
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       schemaGroup     = "_schema"
+       defaultShardID  = common.ShardID(0)
+       defaultRecvSize = 10 << 20
+)
+
+var (
+       _ run.PreRunner = (*server)(nil)
+       _ run.Config    = (*server)(nil)
+       _ run.Service   = (*server)(nil)
+)
+
+// Server is the interface for the standalone schema server.
+type Server interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetPort() *uint32
+}
+
+type server struct {
+       db                       db.Database
+       lfs                      fs.FileSystem
+       omr                      observability.MetricsRegistry
+       closer                   *run.Closer
+       l                        *logger.Logger
+       ser                      *grpclib.Server
+       tlsReloader              *pkgtls.Reloader
+       schemaService            *schemaManagementServer
+       updateService            *schemaUpdateServer
+       host                     string
+       certFile                 string
+       root                     string
+       keyFile                  string
+       addr                     string
+       repairBuildTreeCron      string
+       minFileSnapshotAge       time.Duration
+       flushTimeout             time.Duration
+       snapshotSeq              uint64
+       expireTimeout            time.Duration
+       repairQuickBuildTreeTime time.Duration
+       maxRecvMsgSize           run.Bytes
+       maxFileSnapshotNum       int
+       repairTreeSlotCount      int
+       snapshotMu               sync.Mutex
+       port                     uint32
+       tls                      bool
+}
+
+// NewServer returns a new standalone schema server.
+func NewServer(omr observability.MetricsRegistry) Server {
+       return &server{
+               omr:    omr,
+               closer: run.NewCloser(0),
+       }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+       return &s.port
+}
+
+func (s *server) Name() string {
+       return "schema-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-server")
+       s.maxRecvMsgSize = defaultRecvSize
+       flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root 
storage path")
+       flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of 
schema server")
+       flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of 
schema server")
+       flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 
5*time.Second, "memory flush interval")
+       flagS.DurationVar(&s.expireTimeout, 
"schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration")
+       flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS 
if true")
+       flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS 
cert file")
+       flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key 
file")
+       flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", 
"max gRPC receive message size")
+       flagS.IntVar(&s.repairTreeSlotCount, 
"schema-server-repair-tree-slot-count", 32, "repair tree slot count")
+       flagS.StringVar(&s.repairBuildTreeCron, 
"schema-server-repair-build-tree-cron", "@every 1h",
+               "cron for repair tree building")
+       flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"schema-server-repair-quick-build-tree-time",
+               time.Minute*10, "schema-quick build tree duration")
+       flagS.IntVar(&s.maxFileSnapshotNum, 
"schema-server-max-file-snapshot-num", 10, "the maximum number of file 
snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
+       return flagS
+}
+
+func (s *server) Validate() error {
+       if s.root == "" {
+               return errors.New("root path must not be empty")
+       }
+       if s.port == 0 {
+               s.port = 17920
+       }
+       s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
+       if s.addr == ":" {
+               return errors.New("no address")
+       }
+       if !s.tls {
+               return nil
+       }
+       if s.certFile == "" {
+               return errors.New("invalid server cert file")
+       }
+       if s.keyFile == "" {
+               return errors.New("invalid server key file")
+       }
+       return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+       s.l = logger.GetLogger("schema-server")
+       s.lfs = fs.NewLocalFileSystem()
+
+       grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc"))
+       sm := newServerMetrics(grpcFactory)
+       s.schemaService = &schemaManagementServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+       s.updateService = &schemaUpdateServer{
+               server:  s,
+               l:       s.l,
+               metrics: sm,
+       }
+
+       if s.tls {
+               var tlsErr error
+               s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, 
s.keyFile, s.l)
+               if tlsErr != nil {
+                       return errors.Wrap(tlsErr, "failed to initialize TLS 
reloader for server")
+               }
+       }
+
+       dataDir := filepath.Join(s.root, "schema-property", "data")
+       snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+       repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+       cfg := db.Config{
+               Location:               dataDir,
+               FlushInterval:          s.flushTimeout,
+               ExpireToDeleteDuration: s.expireTimeout,
+               Repair: db.RepairConfig{
+                       Enabled:            true,
+                       Location:           repairDir,
+                       BuildTreeCron:      s.repairBuildTreeCron,
+                       QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+                       TreeSlotCount:      s.repairTreeSlotCount,
+               },
+               Index: db.IndexConfig{
+                       BatchWaitSec:       5,
+                       WaitForPersistence: false,
+               },
+               Snapshot: db.SnapshotConfig{
+                       Location: snapshotDir,
+                       Func: func(ctx context.Context) (string, error) {
+                               s.snapshotMu.Lock()
+                               defer s.snapshotMu.Unlock()
+                               storage.DeleteStaleSnapshots(snapshotDir, 
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+                               sn := s.snapshotName()
+                               snapshot := s.db.TakeSnapShot(ctx, sn)
+                               if snapshot.Error != "" {
+                                       return "", fmt.Errorf("failed to find 
snapshot %s: %s", sn, snapshot.Error)

Review Comment:
   The snapshot error message says "failed to find snapshot" but this code is 
creating/taking a snapshot. Consider changing the wording to "failed to take 
snapshot" (and include the underlying error) to avoid confusion when 
troubleshooting.
   ```suggestion
                                        return "", fmt.Errorf("failed to take 
snapshot %s: %s", sn, snapshot.Error)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to