Copilot commented on code in PR #971: URL: https://github.com/apache/skywalking-banyandb/pull/971#discussion_r2796374791
########## banyand/metadata/schema/schemaserver/grpc.go: ########## @@ -0,0 +1,328 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schemaserver + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +var schemaServerScope = observability.RootScope.SubScope("schema_server") + +type serverMetrics struct { + totalStarted meter.Counter + totalFinished meter.Counter + totalErr meter.Counter + totalLatency meter.Counter +} + +func newServerMetrics(factory observability.Factory) *serverMetrics { + return &serverMetrics{ + totalStarted: factory.NewCounter("total_started", "method"), + totalFinished: factory.NewCounter("total_finished", "method"), + totalErr: factory.NewCounter("total_err", "method"), + totalLatency: factory.NewCounter("total_latency", "method"), + } +} + +type schemaManagementServer struct { + schemav1.UnimplementedSchemaManagementServiceServer + server *server + l *logger.Logger + metrics *serverMetrics +} + +// InsertSchema inserts a new schema property. +func (s *schemaManagementServer) InsertSchema(ctx context.Context, req *schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") Review Comment: The validation error message for `InsertSchema` says "mod_revision should be set for update", which is confusing in the insert path. Consider adjusting the wording (and reusing the same wording consistently across insert/update/repair) to reflect the operation accurately. ```suggestion return nil, errInvalidRequest("mod_revision should be set") ``` ########## banyand/metadata/schema/schemaserver/service.go: ########## @@ -0,0 +1,322 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schemaserver implements a standalone gRPC server for schema property management. +package schemaserver + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" +) + +const ( + schemaGroup = "_schema" + defaultShardID = common.ShardID(0) + defaultRecvSize = 10 << 20 +) + +var ( + _ run.PreRunner = (*server)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) +) + +// Server is the interface for the standalone schema server. +type Server interface { + run.PreRunner + run.Config + run.Service + GetPort() *uint32 +} + +type server struct { + db db.Database + lfs fs.FileSystem + omr observability.MetricsRegistry + closer *run.Closer + l *logger.Logger + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + schemaService *schemaManagementServer + updateService *schemaUpdateServer + host string + certFile string + root string + keyFile string + addr string + repairBuildTreeCron string + minFileSnapshotAge time.Duration + flushTimeout time.Duration + snapshotSeq uint64 + expireTimeout time.Duration + repairQuickBuildTreeTime time.Duration + maxRecvMsgSize run.Bytes + maxFileSnapshotNum int + repairTreeSlotCount int + snapshotMu sync.Mutex + port uint32 + tls bool +} + +// NewServer returns a new standalone schema server. +func NewServer(omr observability.MetricsRegistry) Server { + return &server{ + omr: omr, + closer: run.NewCloser(0), + } +} + +// GetPort returns the gRPC server port. +func (s *server) GetPort() *uint32 { + return &s.port +} + +func (s *server) Name() string { + return "schema-server" +} + +func (s *server) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("schema-server") + s.maxRecvMsgSize = defaultRecvSize + flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root storage path") + flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of schema server") + flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of schema server") + flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 5*time.Second, "memory flush interval") + flagS.DurationVar(&s.expireTimeout, "schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration") + flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS if true") + flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS cert file") + flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key file") + flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", "max gRPC receive message size") + flagS.IntVar(&s.repairTreeSlotCount, "schema-server-repair-tree-slot-count", 32, "repair tree slot count") + flagS.StringVar(&s.repairBuildTreeCron, "schema-server-repair-build-tree-cron", "@every 1h", + "cron for repair tree building") + flagS.DurationVar(&s.repairQuickBuildTreeTime, "schema-server-repair-quick-build-tree-time", + time.Minute*10, "schema-quick build tree duration") + flagS.IntVar(&s.maxFileSnapshotNum, "schema-server-max-file-snapshot-num", 10, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.minFileSnapshotAge, "schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file snapshots to be eligible for deletion") + return flagS +} + +func (s *server) Validate() error { + if s.root == "" { + return errors.New("root path must not be empty") + } + if s.port == 0 { + s.port = 17920 + } + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { + return errors.New("no address") + } + if !s.tls { + return nil + } + if s.certFile == "" { + return errors.New("invalid server cert file") + } + if s.keyFile == "" { + return errors.New("invalid server key file") + } + return nil +} + +func (s *server) PreRun(_ context.Context) error { + s.l = logger.GetLogger("schema-server") + s.lfs = fs.NewLocalFileSystem() + + grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc")) + sm := newServerMetrics(grpcFactory) + s.schemaService = &schemaManagementServer{ + server: s, + l: s.l, + metrics: sm, + } + s.updateService = &schemaUpdateServer{ + server: s, + l: s.l, + metrics: sm, + } + + if s.tls { + var tlsErr error + s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, s.keyFile, s.l) + if tlsErr != nil { + return errors.Wrap(tlsErr, "failed to initialize TLS reloader for server") + } + } + + dataDir := filepath.Join(s.root, "schema-property", "data") + snapshotDir := filepath.Join(s.root, "schema-property", "snapshots") + repairDir := filepath.Join(s.root, "schema-property", "repair") + + cfg := db.Config{ + Location: dataDir, + FlushInterval: s.flushTimeout, + ExpireToDeleteDuration: s.expireTimeout, + Repair: db.RepairConfig{ + Enabled: true, + Location: repairDir, + BuildTreeCron: s.repairBuildTreeCron, + QuickBuildTreeTime: s.repairQuickBuildTreeTime, + TreeSlotCount: s.repairTreeSlotCount, + }, + Index: db.IndexConfig{ + BatchWaitSec: 5, + WaitForPersistence: false, + }, + Snapshot: db.SnapshotConfig{ + Location: snapshotDir, + Func: func(ctx context.Context) (string, error) { + s.snapshotMu.Lock() + defer s.snapshotMu.Unlock() + storage.DeleteStaleSnapshots(snapshotDir, s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs) + sn := s.snapshotName() + snapshot := s.db.TakeSnapShot(ctx, sn) + if snapshot.Error != "" { + return "", fmt.Errorf("failed to find snapshot %s: %s", sn, snapshot.Error) + } + return path.Join(snapshotDir, sn, storage.DataDir), nil + }, + }, + } + + var openErr error + // nolint:contextcheck + s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs) + if openErr != nil { + return errors.Wrap(openErr, "failed to open property database") + } + s.l.Info().Str("root", s.root).Msg("schema property database initialized") + return nil +} + +func (s *server) snapshotName() string { + s.snapshotSeq++ + return fmt.Sprintf("%s-%08X", time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq) +} + +func (s *server) Serve() run.StopNotify { + var opts []grpclib.ServerOption + if s.tls { + if s.tlsReloader != nil { + if startErr := s.tlsReloader.Start(); startErr != nil { + s.l.Error().Err(startErr).Msg("Failed to start TLS reloader for schema server") + return s.closer.CloseNotify() + } + s.l.Info().Str("certFile", s.certFile).Str("keyFile", s.keyFile). + Msg("Started TLS file monitoring for schema server") + tlsConfig := s.tlsReloader.GetTLSConfig() + creds := credentials.NewTLS(tlsConfig) + opts = []grpclib.ServerOption{grpclib.Creds(creds)} + } else { + creds, tlsErr := credentials.NewServerTLSFromFile(s.certFile, s.keyFile) + if tlsErr != nil { + s.l.Error().Err(tlsErr).Msg("Failed to load TLS credentials") + return s.closer.CloseNotify() + } + opts = []grpclib.ServerOption{grpclib.Creds(creds)} + } + } + grpcPanicRecoveryHandler := func(p any) (err error) { + s.l.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic") + return status.Errorf(codes.Internal, "%s", p) + } + streamChain := []grpclib.StreamServerInterceptor{ + recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + } + unaryChain := []grpclib.UnaryServerInterceptor{ + grpc_validator.UnaryServerInterceptor(), + recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + } + opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)), + grpclib.ChainUnaryInterceptor(unaryChain...), + grpclib.ChainStreamInterceptor(streamChain...), + ) + s.ser = grpclib.NewServer(opts...) + schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService) + schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService) + grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer()) + + s.closer.AddRunning() + go func() { + defer s.closer.Done() + lis, lisErr := net.Listen("tcp", s.addr) + if lisErr != nil { + s.l.Error().Err(lisErr).Msg("Failed to listen") + return + } Review Comment: On `net.Listen` failure, the goroutine returns without canceling/closing `s.closer`, so `Serve()` continues to return a StopNotify channel that never closes. Consider signaling failure by closing/canceling the closer (ensuring you don’t deadlock on the running counter), so callers don’t block indefinitely. ########## banyand/metadata/schema/schemaserver/service.go: ########## @@ -0,0 +1,322 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schemaserver implements a standalone gRPC server for schema property management. +package schemaserver + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" +) + +const ( + schemaGroup = "_schema" + defaultShardID = common.ShardID(0) + defaultRecvSize = 10 << 20 +) + +var ( + _ run.PreRunner = (*server)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) +) + +// Server is the interface for the standalone schema server. +type Server interface { + run.PreRunner + run.Config + run.Service + GetPort() *uint32 +} + +type server struct { + db db.Database + lfs fs.FileSystem + omr observability.MetricsRegistry + closer *run.Closer + l *logger.Logger + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + schemaService *schemaManagementServer + updateService *schemaUpdateServer + host string + certFile string + root string + keyFile string + addr string + repairBuildTreeCron string + minFileSnapshotAge time.Duration + flushTimeout time.Duration + snapshotSeq uint64 + expireTimeout time.Duration + repairQuickBuildTreeTime time.Duration + maxRecvMsgSize run.Bytes + maxFileSnapshotNum int + repairTreeSlotCount int + snapshotMu sync.Mutex + port uint32 + tls bool +} + +// NewServer returns a new standalone schema server. +func NewServer(omr observability.MetricsRegistry) Server { + return &server{ + omr: omr, + closer: run.NewCloser(0), + } +} + +// GetPort returns the gRPC server port. +func (s *server) GetPort() *uint32 { + return &s.port +} + +func (s *server) Name() string { + return "schema-server" +} + +func (s *server) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("schema-server") + s.maxRecvMsgSize = defaultRecvSize + flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root storage path") + flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of schema server") + flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of schema server") + flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 5*time.Second, "memory flush interval") + flagS.DurationVar(&s.expireTimeout, "schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration") + flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS if true") + flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS cert file") + flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key file") + flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", "max gRPC receive message size") + flagS.IntVar(&s.repairTreeSlotCount, "schema-server-repair-tree-slot-count", 32, "repair tree slot count") + flagS.StringVar(&s.repairBuildTreeCron, "schema-server-repair-build-tree-cron", "@every 1h", + "cron for repair tree building") + flagS.DurationVar(&s.repairQuickBuildTreeTime, "schema-server-repair-quick-build-tree-time", + time.Minute*10, "schema-quick build tree duration") + flagS.IntVar(&s.maxFileSnapshotNum, "schema-server-max-file-snapshot-num", 10, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.minFileSnapshotAge, "schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file snapshots to be eligible for deletion") + return flagS +} + +func (s *server) Validate() error { + if s.root == "" { + return errors.New("root path must not be empty") + } + if s.port == 0 { + s.port = 17920 + } + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { + return errors.New("no address") + } + if !s.tls { + return nil + } + if s.certFile == "" { + return errors.New("invalid server cert file") + } + if s.keyFile == "" { + return errors.New("invalid server key file") + } + return nil +} + +func (s *server) PreRun(_ context.Context) error { + s.l = logger.GetLogger("schema-server") + s.lfs = fs.NewLocalFileSystem() + + grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc")) + sm := newServerMetrics(grpcFactory) + s.schemaService = &schemaManagementServer{ + server: s, + l: s.l, + metrics: sm, + } + s.updateService = &schemaUpdateServer{ + server: s, + l: s.l, + metrics: sm, + } + + if s.tls { + var tlsErr error + s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, s.keyFile, s.l) + if tlsErr != nil { + return errors.Wrap(tlsErr, "failed to initialize TLS reloader for server") + } + } + + dataDir := filepath.Join(s.root, "schema-property", "data") + snapshotDir := filepath.Join(s.root, "schema-property", "snapshots") + repairDir := filepath.Join(s.root, "schema-property", "repair") + + cfg := db.Config{ + Location: dataDir, + FlushInterval: s.flushTimeout, + ExpireToDeleteDuration: s.expireTimeout, + Repair: db.RepairConfig{ + Enabled: true, + Location: repairDir, + BuildTreeCron: s.repairBuildTreeCron, + QuickBuildTreeTime: s.repairQuickBuildTreeTime, + TreeSlotCount: s.repairTreeSlotCount, + }, + Index: db.IndexConfig{ + BatchWaitSec: 5, + WaitForPersistence: false, + }, + Snapshot: db.SnapshotConfig{ + Location: snapshotDir, + Func: func(ctx context.Context) (string, error) { + s.snapshotMu.Lock() + defer s.snapshotMu.Unlock() + storage.DeleteStaleSnapshots(snapshotDir, s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs) + sn := s.snapshotName() + snapshot := s.db.TakeSnapShot(ctx, sn) + if snapshot.Error != "" { + return "", fmt.Errorf("failed to find snapshot %s: %s", sn, snapshot.Error) + } + return path.Join(snapshotDir, sn, storage.DataDir), nil + }, Review Comment: `path.Join(...)` is used to build filesystem paths under `snapshotDir`. On non-Unix platforms this can produce incorrect separators; the rest of the function uses `filepath.Join`. Prefer `filepath.Join` here as well for consistency and OS-correct path handling. ########## banyand/metadata/schema/schemaserver/grpc.go: ########## @@ -0,0 +1,328 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schemaserver + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +var schemaServerScope = observability.RootScope.SubScope("schema_server") + +type serverMetrics struct { + totalStarted meter.Counter + totalFinished meter.Counter + totalErr meter.Counter + totalLatency meter.Counter +} + +func newServerMetrics(factory observability.Factory) *serverMetrics { + return &serverMetrics{ + totalStarted: factory.NewCounter("total_started", "method"), + totalFinished: factory.NewCounter("total_finished", "method"), + totalErr: factory.NewCounter("total_err", "method"), + totalLatency: factory.NewCounter("total_latency", "method"), + } +} + +type schemaManagementServer struct { + schemav1.UnimplementedSchemaManagementServiceServer + server *server + l *logger.Logger + metrics *serverMetrics +} + +// InsertSchema inserts a new schema property. +func (s *schemaManagementServer) InsertSchema(ctx context.Context, req *schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "insert") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "insert") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "insert") + }() + req.Property.Metadata.Group = schemaGroup + existQuery := &propertyv1.QueryRequest{ + Groups: []string{schemaGroup}, + Name: req.Property.Metadata.Name, + Ids: []string{req.Property.Id}, + } + existing, queryErr := s.server.db.Query(ctx, existQuery) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "insert") + s.l.Error().Err(queryErr).Msg("failed to check schema existence") + return nil, queryErr + } + for _, result := range existing { + if result.DeleteTime() == 0 { + s.metrics.totalErr.Inc(1, "insert") + return nil, fmt.Errorf("schema already exists") + } + } + id := db.GetPropertyID(req.Property) + if updateErr := s.server.db.Update(ctx, defaultShardID, id, req.Property); updateErr != nil { + s.metrics.totalErr.Inc(1, "insert") + s.l.Error().Err(updateErr).Msg("failed to insert schema") + return nil, updateErr + } + return &schemav1.InsertSchemaResponse{}, nil +} + +// UpdateSchema updates an existing schema property. +func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req *schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "update") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "update") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "update") + }() + req.Property.Metadata.Group = schemaGroup + id := db.GetPropertyID(req.Property) + if updateErr := s.server.db.Update(ctx, defaultShardID, id, req.Property); updateErr != nil { + s.metrics.totalErr.Inc(1, "update") + s.l.Error().Err(updateErr).Msg("failed to update schema") + return nil, updateErr + } + return &schemav1.UpdateSchemaResponse{}, nil +} + +const listSchemasBatchSize = 100 + +// ListSchemas lists schema properties via server streaming. +func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest, + stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse], +) error { + if req.Query == nil { + return errInvalidRequest("query is required") + } + s.metrics.totalStarted.Inc(1, "list") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "list") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list") + }() + req.Query.Groups = []string{schemaGroup} + results, queryErr := s.server.db.Query(stream.Context(), req.Query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "list") + s.l.Error().Err(queryErr).Msg("failed to list schemas") + return queryErr + } + for batchStart := 0; batchStart < len(results); batchStart += listSchemasBatchSize { + batchEnd := batchStart + listSchemasBatchSize + if batchEnd > len(results) { + batchEnd = len(results) + } + batch := results[batchStart:batchEnd] + props := make([]*propertyv1.Property, 0, len(batch)) + deleteTimes := make([]int64, 0, len(batch)) + for _, result := range batch { + var p propertyv1.Property + if unmarshalErr := protojson.Unmarshal(result.Source(), &p); unmarshalErr != nil { + s.metrics.totalErr.Inc(1, "list") + return unmarshalErr + } + props = append(props, &p) + deleteTimes = append(deleteTimes, result.DeleteTime()) + } + if sendErr := stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: deleteTimes}); sendErr != nil { + s.metrics.totalErr.Inc(1, "list") + return sendErr + } + } + return nil +} + +// DeleteSchema deletes a schema property. +func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req *schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) { + if req.Delete == nil || req.UpdateAt == nil { + return nil, errInvalidRequest("delete request is required") + } + s.metrics.totalStarted.Inc(1, "delete") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "delete") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "delete") + }() + query := &propertyv1.QueryRequest{ + Groups: []string{schemaGroup}, + Name: req.Delete.Name, + } + if req.Delete.Id != "" { + query.Ids = []string{req.Delete.Id} + } + results, queryErr := s.server.db.Query(ctx, query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "delete") + s.l.Error().Err(queryErr).Msg("failed to delete schema") + return nil, queryErr + } + if len(results) == 0 { + return &schemav1.DeleteSchemaResponse{Found: false}, nil + } + ids := make([][]byte, 0, len(results)) + for _, result := range results { + if result.DeleteTime() > 0 { + continue + } + ids = append(ids, result.ID()) + } + if len(ids) == 0 { + return &schemav1.DeleteSchemaResponse{Found: false}, nil + } + if deleteErr := s.server.db.Delete(ctx, ids, req.UpdateAt.AsTime()); deleteErr != nil { + s.metrics.totalErr.Inc(1, "delete") + s.l.Error().Err(deleteErr).Msg("failed to delete schema") + return nil, deleteErr + } + return &schemav1.DeleteSchemaResponse{Found: true}, nil +} + +// RepairSchema repairs a schema property with the specified delete time. +func (s *schemaManagementServer) RepairSchema(ctx context.Context, req *schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "repair") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "repair") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "repair") + }() + req.Property.Metadata.Group = schemaGroup + id := db.GetPropertyID(req.Property) + if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), req.Property, req.DeleteTime); repairErr != nil { + s.metrics.totalErr.Inc(1, "repair") + s.l.Error().Err(repairErr).Msg("failed to repair schema") + return nil, repairErr + } + return &schemav1.RepairSchemaResponse{}, nil +} + +// ExistSchema checks if a schema property exists. +func (s *schemaManagementServer) ExistSchema(ctx context.Context, req *schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) { + if req.Query == nil { + return nil, errInvalidRequest("query is required") + } + s.metrics.totalStarted.Inc(1, "exist") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "exist") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist") + }() + req.Query.Groups = []string{schemaGroup} + req.Query.Limit = 1 + results, queryErr := s.server.db.Query(ctx, req.Query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "exist") + s.l.Error().Err(queryErr).Msg("failed to check schema existence") + return nil, queryErr + } + return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil +} + +type schemaUpdateServer struct { + schemav1.UnimplementedSchemaUpdateServiceServer + server *server + l *logger.Logger + metrics *serverMetrics +} + +// AggregateSchemaUpdates returns distinct schema names that have been modified. +func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context, + req *schemav1.AggregateSchemaUpdatesRequest, +) (*schemav1.AggregateSchemaUpdatesResponse, error) { + s.metrics.totalStarted.Inc(1, "aggregate") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "aggregate") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "aggregate") + }() + if req.Query == nil { + return nil, errInvalidRequest("query is required") + } + req.Query.Groups = []string{schemaGroup} + results, queryErr := s.server.db.Query(ctx, req.Query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "aggregate") + s.l.Error().Err(queryErr).Msg("failed to aggregate schema updates") + return nil, queryErr + } + nameSet := make(map[string]struct{}, len(results)) + for _, result := range results { + var p propertyv1.Property + if unmarshalErr := protojson.Unmarshal(result.Source(), &p); unmarshalErr != nil { + s.metrics.totalErr.Inc(1, "aggregate") + return nil, unmarshalErr + } + if p.Metadata != nil && p.Metadata.Name != "" { + nameSet[p.Metadata.Name] = struct{}{} + } + } + names := make([]string, 0, len(nameSet)) + for name := range nameSet { + names = append(names, name) + } + return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil +} + +func errInvalidRequest(msg string) error { + return &invalidRequestError{msg: msg} +} + +type invalidRequestError struct { + msg string +} + +func (e *invalidRequestError) Error() string { + return "invalid request: " + e.msg +} Review Comment: `errInvalidRequest(...)` returns a plain Go error, so callers will receive gRPC code `Unknown` instead of `InvalidArgument`. Consider returning `status.Error(codes.InvalidArgument, ...)` (or implementing `GRPCStatus()` on the custom error) and similarly use appropriate status codes like `AlreadyExists` for duplicate inserts. ########## banyand/metadata/schema/schemaserver/service.go: ########## @@ -0,0 +1,322 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schemaserver implements a standalone gRPC server for schema property management. +package schemaserver + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" +) + +const ( + schemaGroup = "_schema" + defaultShardID = common.ShardID(0) + defaultRecvSize = 10 << 20 +) + +var ( + _ run.PreRunner = (*server)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) +) + +// Server is the interface for the standalone schema server. +type Server interface { + run.PreRunner + run.Config + run.Service + GetPort() *uint32 +} + +type server struct { + db db.Database + lfs fs.FileSystem + omr observability.MetricsRegistry + closer *run.Closer + l *logger.Logger + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + schemaService *schemaManagementServer + updateService *schemaUpdateServer + host string + certFile string + root string + keyFile string + addr string + repairBuildTreeCron string + minFileSnapshotAge time.Duration + flushTimeout time.Duration + snapshotSeq uint64 + expireTimeout time.Duration + repairQuickBuildTreeTime time.Duration + maxRecvMsgSize run.Bytes + maxFileSnapshotNum int + repairTreeSlotCount int + snapshotMu sync.Mutex + port uint32 + tls bool +} + +// NewServer returns a new standalone schema server. +func NewServer(omr observability.MetricsRegistry) Server { + return &server{ + omr: omr, + closer: run.NewCloser(0), + } +} + +// GetPort returns the gRPC server port. +func (s *server) GetPort() *uint32 { + return &s.port +} + +func (s *server) Name() string { + return "schema-server" +} + +func (s *server) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("schema-server") + s.maxRecvMsgSize = defaultRecvSize + flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root storage path") + flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of schema server") + flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of schema server") + flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 5*time.Second, "memory flush interval") + flagS.DurationVar(&s.expireTimeout, "schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration") + flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS if true") + flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS cert file") + flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key file") + flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", "max gRPC receive message size") + flagS.IntVar(&s.repairTreeSlotCount, "schema-server-repair-tree-slot-count", 32, "repair tree slot count") + flagS.StringVar(&s.repairBuildTreeCron, "schema-server-repair-build-tree-cron", "@every 1h", + "cron for repair tree building") + flagS.DurationVar(&s.repairQuickBuildTreeTime, "schema-server-repair-quick-build-tree-time", + time.Minute*10, "schema-quick build tree duration") + flagS.IntVar(&s.maxFileSnapshotNum, "schema-server-max-file-snapshot-num", 10, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.minFileSnapshotAge, "schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file snapshots to be eligible for deletion") + return flagS +} + +func (s *server) Validate() error { + if s.root == "" { + return errors.New("root path must not be empty") + } + if s.port == 0 { + s.port = 17920 + } + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { + return errors.New("no address") + } + if !s.tls { + return nil + } + if s.certFile == "" { + return errors.New("invalid server cert file") + } + if s.keyFile == "" { + return errors.New("invalid server key file") + } + return nil +} + +func (s *server) PreRun(_ context.Context) error { + s.l = logger.GetLogger("schema-server") + s.lfs = fs.NewLocalFileSystem() + + grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc")) + sm := newServerMetrics(grpcFactory) + s.schemaService = &schemaManagementServer{ + server: s, + l: s.l, + metrics: sm, + } + s.updateService = &schemaUpdateServer{ + server: s, + l: s.l, + metrics: sm, + } + + if s.tls { + var tlsErr error + s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, s.keyFile, s.l) + if tlsErr != nil { + return errors.Wrap(tlsErr, "failed to initialize TLS reloader for server") + } + } + + dataDir := filepath.Join(s.root, "schema-property", "data") + snapshotDir := filepath.Join(s.root, "schema-property", "snapshots") + repairDir := filepath.Join(s.root, "schema-property", "repair") + + cfg := db.Config{ + Location: dataDir, + FlushInterval: s.flushTimeout, + ExpireToDeleteDuration: s.expireTimeout, + Repair: db.RepairConfig{ + Enabled: true, + Location: repairDir, + BuildTreeCron: s.repairBuildTreeCron, + QuickBuildTreeTime: s.repairQuickBuildTreeTime, + TreeSlotCount: s.repairTreeSlotCount, + }, + Index: db.IndexConfig{ + BatchWaitSec: 5, + WaitForPersistence: false, + }, + Snapshot: db.SnapshotConfig{ + Location: snapshotDir, + Func: func(ctx context.Context) (string, error) { + s.snapshotMu.Lock() + defer s.snapshotMu.Unlock() + storage.DeleteStaleSnapshots(snapshotDir, s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs) + sn := s.snapshotName() + snapshot := s.db.TakeSnapShot(ctx, sn) + if snapshot.Error != "" { + return "", fmt.Errorf("failed to find snapshot %s: %s", sn, snapshot.Error) + } + return path.Join(snapshotDir, sn, storage.DataDir), nil + }, + }, + } + + var openErr error + // nolint:contextcheck + s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs) + if openErr != nil { + return errors.Wrap(openErr, "failed to open property database") + } + s.l.Info().Str("root", s.root).Msg("schema property database initialized") + return nil +} + +func (s *server) snapshotName() string { + s.snapshotSeq++ + return fmt.Sprintf("%s-%08X", time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq) +} + +func (s *server) Serve() run.StopNotify { + var opts []grpclib.ServerOption + if s.tls { + if s.tlsReloader != nil { + if startErr := s.tlsReloader.Start(); startErr != nil { + s.l.Error().Err(startErr).Msg("Failed to start TLS reloader for schema server") + return s.closer.CloseNotify() + } + s.l.Info().Str("certFile", s.certFile).Str("keyFile", s.keyFile). + Msg("Started TLS file monitoring for schema server") + tlsConfig := s.tlsReloader.GetTLSConfig() + creds := credentials.NewTLS(tlsConfig) + opts = []grpclib.ServerOption{grpclib.Creds(creds)} + } else { + creds, tlsErr := credentials.NewServerTLSFromFile(s.certFile, s.keyFile) + if tlsErr != nil { + s.l.Error().Err(tlsErr).Msg("Failed to load TLS credentials") + return s.closer.CloseNotify() + } + opts = []grpclib.ServerOption{grpclib.Creds(creds)} + } + } + grpcPanicRecoveryHandler := func(p any) (err error) { + s.l.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic") + return status.Errorf(codes.Internal, "%s", p) + } + streamChain := []grpclib.StreamServerInterceptor{ + recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + } + unaryChain := []grpclib.UnaryServerInterceptor{ + grpc_validator.UnaryServerInterceptor(), + recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + } + opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)), + grpclib.ChainUnaryInterceptor(unaryChain...), + grpclib.ChainStreamInterceptor(streamChain...), + ) + s.ser = grpclib.NewServer(opts...) + schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService) + schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService) + grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer()) + + s.closer.AddRunning() + go func() { + defer s.closer.Done() + lis, lisErr := net.Listen("tcp", s.addr) + if lisErr != nil { + s.l.Error().Err(lisErr).Msg("Failed to listen") + return + } + s.l.Info().Str("addr", s.addr).Msg("Listening to") + serveErr := s.ser.Serve(lis) + if serveErr != nil { + s.l.Error().Err(serveErr).Msg("server is interrupted") + } + }() + return s.closer.CloseNotify() +} + +func (s *server) GracefulStop() { + if s.tlsReloader != nil { + s.tlsReloader.Stop() + } + stopped := make(chan struct{}) + go func() { + s.ser.GracefulStop() + close(stopped) + }() + t := time.NewTimer(10 * time.Second) + select { + case <-t.C: + s.ser.Stop() + s.l.Info().Msg("force stopped") + case <-stopped: + t.Stop() + s.l.Info().Msg("stopped gracefully") + } + if s.db != nil { + if closeErr := s.db.Close(); closeErr != nil { + s.l.Error().Err(closeErr).Msg("failed to close database") + } + } + s.closer.CloseThenWait() Review Comment: `GracefulStop()` calls `s.ser.GracefulStop()` unconditionally; if `Serve()` was never called (or failed before initializing `s.ser`), this will panic. Add a nil check for `s.ser` (and possibly `s.closer`) before stopping. ```suggestion if s == nil { return } if s.tlsReloader != nil { s.tlsReloader.Stop() } if s.ser != nil { stopped := make(chan struct{}) go func() { s.ser.GracefulStop() close(stopped) }() t := time.NewTimer(10 * time.Second) select { case <-t.C: s.ser.Stop() s.l.Info().Msg("force stopped") case <-stopped: t.Stop() s.l.Info().Msg("stopped gracefully") } } if s.db != nil { if closeErr := s.db.Close(); closeErr != nil { s.l.Error().Err(closeErr).Msg("failed to close database") } } if s.closer != nil { s.closer.CloseThenWait() } ``` ########## banyand/metadata/schema/schemaserver/service.go: ########## @@ -0,0 +1,322 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schemaserver implements a standalone gRPC server for schema property management. +package schemaserver + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" +) + +const ( + schemaGroup = "_schema" + defaultShardID = common.ShardID(0) + defaultRecvSize = 10 << 20 +) + +var ( + _ run.PreRunner = (*server)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) +) + +// Server is the interface for the standalone schema server. +type Server interface { + run.PreRunner + run.Config + run.Service + GetPort() *uint32 +} + +type server struct { + db db.Database + lfs fs.FileSystem + omr observability.MetricsRegistry + closer *run.Closer + l *logger.Logger + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + schemaService *schemaManagementServer + updateService *schemaUpdateServer + host string + certFile string + root string + keyFile string + addr string + repairBuildTreeCron string + minFileSnapshotAge time.Duration + flushTimeout time.Duration + snapshotSeq uint64 + expireTimeout time.Duration + repairQuickBuildTreeTime time.Duration + maxRecvMsgSize run.Bytes + maxFileSnapshotNum int + repairTreeSlotCount int + snapshotMu sync.Mutex + port uint32 + tls bool +} + +// NewServer returns a new standalone schema server. +func NewServer(omr observability.MetricsRegistry) Server { + return &server{ + omr: omr, + closer: run.NewCloser(0), + } +} + +// GetPort returns the gRPC server port. +func (s *server) GetPort() *uint32 { + return &s.port +} + +func (s *server) Name() string { + return "schema-server" +} + +func (s *server) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("schema-server") + s.maxRecvMsgSize = defaultRecvSize + flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root storage path") + flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of schema server") + flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of schema server") + flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 5*time.Second, "memory flush interval") + flagS.DurationVar(&s.expireTimeout, "schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration") + flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS if true") + flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS cert file") + flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key file") + flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", "max gRPC receive message size") + flagS.IntVar(&s.repairTreeSlotCount, "schema-server-repair-tree-slot-count", 32, "repair tree slot count") + flagS.StringVar(&s.repairBuildTreeCron, "schema-server-repair-build-tree-cron", "@every 1h", + "cron for repair tree building") + flagS.DurationVar(&s.repairQuickBuildTreeTime, "schema-server-repair-quick-build-tree-time", + time.Minute*10, "schema-quick build tree duration") + flagS.IntVar(&s.maxFileSnapshotNum, "schema-server-max-file-snapshot-num", 10, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.minFileSnapshotAge, "schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file snapshots to be eligible for deletion") + return flagS +} + +func (s *server) Validate() error { + if s.root == "" { + return errors.New("root path must not be empty") + } + if s.port == 0 { + s.port = 17920 Review Comment: The default port correction in `Validate()` sets `s.port = 17920`, which doesn’t match the default flag value (17916) and looks like a typo. This can lead to surprising behavior if the port is explicitly set to 0. Align the fallback with the documented/default port. ```suggestion s.port = 17916 ``` ########## banyand/property/scheduler.go: ########## @@ -0,0 +1,110 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "math/rand/v2" + "time" + + "github.com/robfig/cron/v3" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +type repairScheduler struct { + metadata metadata.Repo + gossipMessenger gossip.Messenger + scheduler *timestamp.Scheduler +} + +func newRepairScheduler(ctx context.Context, l *logger.Logger, cronExp string, metadata metadata.Repo, messenger gossip.Messenger) (*repairScheduler, error) { + r := &repairScheduler{ + metadata: metadata, + gossipMessenger: messenger, + scheduler: timestamp.NewScheduler(l, timestamp.NewClock()), + } + err := r.scheduler.Register("trigger", cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor, + cronExp, func(time.Time, *logger.Logger) bool { + l.Debug().Msgf("starting background repair gossip") + group, shardNum, nodes, gossipErr := r.doRepairGossip(ctx) + if gossipErr != nil { + l.Err(gossipErr).Msg("failed to repair gossip") + return true + } + l.Info().Str("group", group).Uint32("shardNum", shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled") + return true + }) + if err != nil { + return nil, fmt.Errorf("failed to add repair build tree cron task: %w", err) + } Review Comment: The returned error says "failed to add repair build tree cron task", but this scheduler registers the "trigger" task for background repair gossip. Update the message to match the actual task to avoid misleading logs. ########## banyand/metadata/schema/schemaserver/grpc.go: ########## @@ -0,0 +1,328 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schemaserver + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +var schemaServerScope = observability.RootScope.SubScope("schema_server") + +type serverMetrics struct { + totalStarted meter.Counter + totalFinished meter.Counter + totalErr meter.Counter + totalLatency meter.Counter +} + +func newServerMetrics(factory observability.Factory) *serverMetrics { + return &serverMetrics{ + totalStarted: factory.NewCounter("total_started", "method"), + totalFinished: factory.NewCounter("total_finished", "method"), + totalErr: factory.NewCounter("total_err", "method"), + totalLatency: factory.NewCounter("total_latency", "method"), + } +} + +type schemaManagementServer struct { + schemav1.UnimplementedSchemaManagementServiceServer + server *server + l *logger.Logger + metrics *serverMetrics +} + +// InsertSchema inserts a new schema property. +func (s *schemaManagementServer) InsertSchema(ctx context.Context, req *schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "insert") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "insert") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "insert") + }() + req.Property.Metadata.Group = schemaGroup + existQuery := &propertyv1.QueryRequest{ + Groups: []string{schemaGroup}, + Name: req.Property.Metadata.Name, + Ids: []string{req.Property.Id}, + } + existing, queryErr := s.server.db.Query(ctx, existQuery) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "insert") + s.l.Error().Err(queryErr).Msg("failed to check schema existence") + return nil, queryErr + } + for _, result := range existing { + if result.DeleteTime() == 0 { + s.metrics.totalErr.Inc(1, "insert") + return nil, fmt.Errorf("schema already exists") + } + } + id := db.GetPropertyID(req.Property) + if updateErr := s.server.db.Update(ctx, defaultShardID, id, req.Property); updateErr != nil { + s.metrics.totalErr.Inc(1, "insert") + s.l.Error().Err(updateErr).Msg("failed to insert schema") + return nil, updateErr + } + return &schemav1.InsertSchemaResponse{}, nil +} + +// UpdateSchema updates an existing schema property. +func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req *schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "update") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "update") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "update") + }() + req.Property.Metadata.Group = schemaGroup + id := db.GetPropertyID(req.Property) + if updateErr := s.server.db.Update(ctx, defaultShardID, id, req.Property); updateErr != nil { + s.metrics.totalErr.Inc(1, "update") + s.l.Error().Err(updateErr).Msg("failed to update schema") + return nil, updateErr + } + return &schemav1.UpdateSchemaResponse{}, nil +} + +const listSchemasBatchSize = 100 + +// ListSchemas lists schema properties via server streaming. +func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest, + stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse], +) error { + if req.Query == nil { + return errInvalidRequest("query is required") + } + s.metrics.totalStarted.Inc(1, "list") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "list") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list") + }() + req.Query.Groups = []string{schemaGroup} + results, queryErr := s.server.db.Query(stream.Context(), req.Query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "list") + s.l.Error().Err(queryErr).Msg("failed to list schemas") + return queryErr + } + for batchStart := 0; batchStart < len(results); batchStart += listSchemasBatchSize { + batchEnd := batchStart + listSchemasBatchSize + if batchEnd > len(results) { + batchEnd = len(results) + } + batch := results[batchStart:batchEnd] + props := make([]*propertyv1.Property, 0, len(batch)) + deleteTimes := make([]int64, 0, len(batch)) + for _, result := range batch { + var p propertyv1.Property + if unmarshalErr := protojson.Unmarshal(result.Source(), &p); unmarshalErr != nil { + s.metrics.totalErr.Inc(1, "list") + return unmarshalErr + } + props = append(props, &p) + deleteTimes = append(deleteTimes, result.DeleteTime()) + } + if sendErr := stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes: deleteTimes}); sendErr != nil { + s.metrics.totalErr.Inc(1, "list") + return sendErr + } + } + return nil +} + +// DeleteSchema deletes a schema property. +func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req *schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) { + if req.Delete == nil || req.UpdateAt == nil { + return nil, errInvalidRequest("delete request is required") + } + s.metrics.totalStarted.Inc(1, "delete") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "delete") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "delete") + }() + query := &propertyv1.QueryRequest{ + Groups: []string{schemaGroup}, + Name: req.Delete.Name, + } + if req.Delete.Id != "" { + query.Ids = []string{req.Delete.Id} + } + results, queryErr := s.server.db.Query(ctx, query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "delete") + s.l.Error().Err(queryErr).Msg("failed to delete schema") + return nil, queryErr + } + if len(results) == 0 { + return &schemav1.DeleteSchemaResponse{Found: false}, nil + } + ids := make([][]byte, 0, len(results)) + for _, result := range results { + if result.DeleteTime() > 0 { + continue + } + ids = append(ids, result.ID()) + } + if len(ids) == 0 { + return &schemav1.DeleteSchemaResponse{Found: false}, nil + } + if deleteErr := s.server.db.Delete(ctx, ids, req.UpdateAt.AsTime()); deleteErr != nil { + s.metrics.totalErr.Inc(1, "delete") + s.l.Error().Err(deleteErr).Msg("failed to delete schema") + return nil, deleteErr + } + return &schemav1.DeleteSchemaResponse{Found: true}, nil +} + +// RepairSchema repairs a schema property with the specified delete time. +func (s *schemaManagementServer) RepairSchema(ctx context.Context, req *schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) { + if req.Property == nil { + return nil, errInvalidRequest("property is required") + } + if req.Property.Metadata == nil { + return nil, errInvalidRequest("metadata should not be nil") + } + if req.Property.Metadata.ModRevision == 0 { + return nil, errInvalidRequest("mod_revision should be set for update") + } + s.metrics.totalStarted.Inc(1, "repair") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "repair") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "repair") + }() + req.Property.Metadata.Group = schemaGroup + id := db.GetPropertyID(req.Property) + if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID), req.Property, req.DeleteTime); repairErr != nil { + s.metrics.totalErr.Inc(1, "repair") + s.l.Error().Err(repairErr).Msg("failed to repair schema") + return nil, repairErr + } + return &schemav1.RepairSchemaResponse{}, nil +} + +// ExistSchema checks if a schema property exists. +func (s *schemaManagementServer) ExistSchema(ctx context.Context, req *schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) { + if req.Query == nil { + return nil, errInvalidRequest("query is required") + } + s.metrics.totalStarted.Inc(1, "exist") + start := time.Now() + defer func() { + s.metrics.totalFinished.Inc(1, "exist") + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist") + }() + req.Query.Groups = []string{schemaGroup} + req.Query.Limit = 1 + results, queryErr := s.server.db.Query(ctx, req.Query) + if queryErr != nil { + s.metrics.totalErr.Inc(1, "exist") + s.l.Error().Err(queryErr).Msg("failed to check schema existence") + return nil, queryErr + } + return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil Review Comment: `ExistSchema` currently returns `HasSchema=true` if *any* document matches, including soft-deleted ones (since it only checks `len(results) > 0`). This makes existence checks incorrect after deletion. Filter results by `DeleteTime()==0` (and consider querying with a limit that still allows finding a non-deleted entry). ```suggestion hasSchema := false for _, r := range results { if r.DeleteTime() == 0 { hasSchema = true break } } return &schemav1.ExistSchemaResponse{HasSchema: hasSchema}, nil ``` ########## banyand/metadata/schema/schemaserver/grpc_test.go: ########## @@ -0,0 +1,430 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schemaserver + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" +) + +func startTestServer(t *testing.T) (schemav1.SchemaManagementServiceClient, schemav1.SchemaUpdateServiceClient) { + t.Helper() + srv := NewServer(observability.BypassRegistry).(*server) + flagSet := srv.FlagSet() + require.NoError(t, flagSet.Parse([]string{})) + srv.root = t.TempDir() + srv.host = "127.0.0.1" + srv.port = getFreePort(t) + require.NoError(t, srv.Validate()) + require.NoError(t, srv.PreRun(context.Background())) + srv.Serve() + t.Cleanup(func() { srv.GracefulStop() }) + time.Sleep(100 * time.Millisecond) Review Comment: `startTestServer` uses `time.Sleep(100 * time.Millisecond)` to wait for server readiness, which can be flaky under load/CI. Consider waiting deterministically (e.g., poll the gRPC health service or attempt to dial with retries until success / context deadline). ```suggestion // Wait deterministically for the server to start accepting connections. deadline := time.Now().Add(5 * time.Second) for { conn, err := net.DialTimeout("tcp", srv.addr, 100*time.Millisecond) if err == nil { _ = conn.Close() break } if time.Now().After(deadline) { require.FailNowf(t, "server did not start listening in time", "last error: %v", err) } time.Sleep(50 * time.Millisecond) } ``` ########## banyand/metadata/schema/schemaserver/service.go: ########## @@ -0,0 +1,322 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schemaserver implements a standalone gRPC server for schema property management. +package schemaserver + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/db" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + pkgtls "github.com/apache/skywalking-banyandb/pkg/tls" +) + +const ( + schemaGroup = "_schema" + defaultShardID = common.ShardID(0) + defaultRecvSize = 10 << 20 +) + +var ( + _ run.PreRunner = (*server)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) +) + +// Server is the interface for the standalone schema server. +type Server interface { + run.PreRunner + run.Config + run.Service + GetPort() *uint32 +} + +type server struct { + db db.Database + lfs fs.FileSystem + omr observability.MetricsRegistry + closer *run.Closer + l *logger.Logger + ser *grpclib.Server + tlsReloader *pkgtls.Reloader + schemaService *schemaManagementServer + updateService *schemaUpdateServer + host string + certFile string + root string + keyFile string + addr string + repairBuildTreeCron string + minFileSnapshotAge time.Duration + flushTimeout time.Duration + snapshotSeq uint64 + expireTimeout time.Duration + repairQuickBuildTreeTime time.Duration + maxRecvMsgSize run.Bytes + maxFileSnapshotNum int + repairTreeSlotCount int + snapshotMu sync.Mutex + port uint32 + tls bool +} + +// NewServer returns a new standalone schema server. +func NewServer(omr observability.MetricsRegistry) Server { + return &server{ + omr: omr, + closer: run.NewCloser(0), + } +} + +// GetPort returns the gRPC server port. +func (s *server) GetPort() *uint32 { + return &s.port +} + +func (s *server) Name() string { + return "schema-server" +} + +func (s *server) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet("schema-server") + s.maxRecvMsgSize = defaultRecvSize + flagS.StringVar(&s.root, "schema-server-root-path", "/tmp", "root storage path") + flagS.StringVar(&s.host, "schema-server-grpc-host", "", "the host of schema server") + flagS.Uint32Var(&s.port, "schema-server-grpc-port", 17916, "the port of schema server") + flagS.DurationVar(&s.flushTimeout, "schema-server-flush-timeout", 5*time.Second, "memory flush interval") + flagS.DurationVar(&s.expireTimeout, "schema-server-expire-delete-timeout", time.Hour*24*7, "soft-delete expiration") + flagS.BoolVar(&s.tls, "schema-server-tls", false, "connection uses TLS if true") + flagS.StringVar(&s.certFile, "schema-server-cert-file", "", "the TLS cert file") + flagS.StringVar(&s.keyFile, "schema-server-key-file", "", "the TLS key file") + flagS.VarP(&s.maxRecvMsgSize, "schema-server-max-recv-msg-size", "", "max gRPC receive message size") + flagS.IntVar(&s.repairTreeSlotCount, "schema-server-repair-tree-slot-count", 32, "repair tree slot count") + flagS.StringVar(&s.repairBuildTreeCron, "schema-server-repair-build-tree-cron", "@every 1h", + "cron for repair tree building") + flagS.DurationVar(&s.repairQuickBuildTreeTime, "schema-server-repair-quick-build-tree-time", + time.Minute*10, "schema-quick build tree duration") + flagS.IntVar(&s.maxFileSnapshotNum, "schema-server-max-file-snapshot-num", 10, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.minFileSnapshotAge, "schema-server-min-file-snapshot-age", time.Hour, "the minimum age for file snapshots to be eligible for deletion") + return flagS +} + +func (s *server) Validate() error { + if s.root == "" { + return errors.New("root path must not be empty") + } + if s.port == 0 { + s.port = 17920 + } + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { + return errors.New("no address") + } + if !s.tls { + return nil + } + if s.certFile == "" { + return errors.New("invalid server cert file") + } + if s.keyFile == "" { + return errors.New("invalid server key file") + } + return nil +} + +func (s *server) PreRun(_ context.Context) error { + s.l = logger.GetLogger("schema-server") + s.lfs = fs.NewLocalFileSystem() + + grpcFactory := s.omr.With(schemaServerScope.SubScope("grpc")) + sm := newServerMetrics(grpcFactory) + s.schemaService = &schemaManagementServer{ + server: s, + l: s.l, + metrics: sm, + } + s.updateService = &schemaUpdateServer{ + server: s, + l: s.l, + metrics: sm, + } + + if s.tls { + var tlsErr error + s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile, s.keyFile, s.l) + if tlsErr != nil { + return errors.Wrap(tlsErr, "failed to initialize TLS reloader for server") + } + } + + dataDir := filepath.Join(s.root, "schema-property", "data") + snapshotDir := filepath.Join(s.root, "schema-property", "snapshots") + repairDir := filepath.Join(s.root, "schema-property", "repair") + + cfg := db.Config{ + Location: dataDir, + FlushInterval: s.flushTimeout, + ExpireToDeleteDuration: s.expireTimeout, + Repair: db.RepairConfig{ + Enabled: true, + Location: repairDir, + BuildTreeCron: s.repairBuildTreeCron, + QuickBuildTreeTime: s.repairQuickBuildTreeTime, + TreeSlotCount: s.repairTreeSlotCount, + }, + Index: db.IndexConfig{ + BatchWaitSec: 5, + WaitForPersistence: false, + }, + Snapshot: db.SnapshotConfig{ + Location: snapshotDir, + Func: func(ctx context.Context) (string, error) { + s.snapshotMu.Lock() + defer s.snapshotMu.Unlock() + storage.DeleteStaleSnapshots(snapshotDir, s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs) + sn := s.snapshotName() + snapshot := s.db.TakeSnapShot(ctx, sn) + if snapshot.Error != "" { + return "", fmt.Errorf("failed to find snapshot %s: %s", sn, snapshot.Error) Review Comment: The snapshot error message says "failed to find snapshot" but this code is creating/taking a snapshot. Consider changing the wording to "failed to take snapshot" (and include the underlying error) to avoid confusion when troubleshooting. ```suggestion return "", fmt.Errorf("failed to take snapshot %s: %s", sn, snapshot.Error) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
