Copilot commented on code in PR #971:
URL:
https://github.com/apache/skywalking-banyandb/pull/971#discussion_r2793015468
##########
banyand/property/gossip/service.go:
##########
@@ -238,7 +238,7 @@ func (s *service) Serve(stopCh chan struct{}) {
lis, err := net.Listen("tcp", s.addr)
if err != nil {
s.log.Error().Err(err).Msg("Failed to listen")
- close(stopCh)
+ closer.CloseThenWait()
return
}
Review Comment:
On `net.Listen` failure, the goroutine returns without calling `wg.Done()`,
so the second goroutine (`wg.Wait()`) will block forever. Ensure `wg.Done()` is
called on all exit paths (e.g., `defer wg.Done()` at goroutine start) and avoid
leaking goroutines.
##########
banyand/metadata/embeddedserver/server.go:
##########
@@ -30,20 +30,30 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+
"github.com/apache/skywalking-banyandb/banyand/metadata/schema/propertyserver"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var (
+ schemaTypeEtcd = "embededetcd"
Review Comment:
The default schema storage type constant is spelled `embededetcd`, but the
flag help text says `embedetcd`. This mismatch is user-facing and makes it easy
to pass a value that won’t match the intended option. Align the constant and
the help string (and consider validating unknown values).
```suggestion
schemaTypeEtcd = "embedetcd"
```
##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+ return &serverMetrics{
+ totalStarted: factory.NewCounter("total_started", "method"),
+ totalFinished: factory.NewCounter("total_finished", "method"),
+ totalErr: factory.NewCounter("total_err", "method"),
+ totalLatency: factory.NewCounter("total_latency", "method"),
+ }
+}
+
+type schemaManagementServer struct {
+ schemav1.UnimplementedSchemaManagementServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "insert")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "insert")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"insert")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ existQuery := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Property.Metadata.Name,
+ Ids: []string{req.Property.Id},
+ Limit: 1,
+ }
+ existing, queryErr := s.server.db.Query(ctx, existQuery)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ for _, result := range existing {
+ if result.DeleteTime() == 0 {
+ s.metrics.totalErr.Inc(1, "insert")
+ return nil, fmt.Errorf("schema already exists: name=%s,
id=%s", req.Property.Metadata.Name, req.Property.Id)
+ }
+ }
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(updateErr).Msg("failed to insert schema")
+ return nil, updateErr
+ }
+ return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "update")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "update")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"update")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "update")
+ s.l.Error().Err(updateErr).Msg("failed to update schema")
+ return nil, updateErr
+ }
+ return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+ stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+ if req.Query == nil {
+ return errInvalidRequest("query is required")
+ }
+ s.metrics.totalStarted.Inc(1, "list")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "list")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+ }()
+ req.Query.Groups = []string{schemaGroup}
+ results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ s.l.Error().Err(queryErr).Msg("failed to list schemas")
+ return queryErr
+ }
+ for batchStart := 0; batchStart < len(results); batchStart +=
listSchemasBatchSize {
+ batchEnd := batchStart + listSchemasBatchSize
+ if batchEnd > len(results) {
+ batchEnd = len(results)
+ }
+ batch := results[batchStart:batchEnd]
+ props := make([]*propertyv1.Property, 0, len(batch))
+ deleteTimes := make([]int64, 0, len(batch))
+ for _, result := range batch {
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(),
&p); unmarshalErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return unmarshalErr
+ }
+ props = append(props, &p)
+ deleteTimes = append(deleteTimes, result.DeleteTime())
+ }
+ if sendErr :=
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes:
deleteTimes}); sendErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return sendErr
+ }
+ }
+ return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+ if req.Delete == nil {
+ return nil, errInvalidRequest("delete request is required")
+ }
+ s.metrics.totalStarted.Inc(1, "delete")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "delete")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"delete")
+ }()
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Delete.Name,
+ }
+ if req.Delete.Id != "" {
+ query.Ids = []string{req.Delete.Id}
+ }
+ results, queryErr := s.server.db.Query(ctx, query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "delete")
+ s.l.Error().Err(queryErr).Msg("failed to delete schema")
+ return nil, queryErr
+ }
+ if len(results) == 0 {
+ return &schemav1.DeleteSchemaResponse{Found: false}, nil
+ }
+ ids := make([][]byte, 0, len(results))
+ for _, result := range results {
+ if result.DeleteTime() > 0 {
+ continue
+ }
+ ids = append(ids, result.ID())
+ }
+ if len(ids) == 0 {
+ return &schemav1.DeleteSchemaResponse{Found: false}, nil
+ }
+ if deleteErr := s.server.db.Delete(ctx, ids); deleteErr != nil {
+ s.metrics.totalErr.Inc(1, "delete")
+ s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+ return nil, deleteErr
+ }
+ return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "repair")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "repair")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"repair")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ id := db.GetPropertyID(req.Property)
+ if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID),
req.Property, req.DeleteTime); repairErr != nil {
+ s.metrics.totalErr.Inc(1, "repair")
+ s.l.Error().Err(repairErr).Msg("failed to repair schema")
+ return nil, repairErr
+ }
+ return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+ if req.Query == nil {
+ return nil, errInvalidRequest("query is required")
+ }
+ s.metrics.totalStarted.Inc(1, "exist")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "exist")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+ }()
+ req.Query.Groups = []string{schemaGroup}
+ req.Query.Limit = 1
+ results, queryErr := s.server.db.Query(ctx, req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "exist")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil
+}
+
+type schemaUpdateServer struct {
+ schemav1.UnimplementedSchemaUpdateServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// AggregateSchemaUpdates returns distinct schema names that have been
modified.
+func (s *schemaUpdateServer) AggregateSchemaUpdates(ctx context.Context,
+ req *schemav1.AggregateSchemaUpdatesRequest,
+) (*schemav1.AggregateSchemaUpdatesResponse, error) {
+ s.metrics.totalStarted.Inc(1, "aggregate")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "aggregate")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"aggregate")
+ }()
+ if req.Query == nil {
+ return nil, errInvalidRequest("query is required")
+ }
+ req.Query.Groups = []string{schemaGroup}
+ results, queryErr := s.server.db.Query(ctx, req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "aggregate")
+ s.l.Error().Err(queryErr).Msg("failed to aggregate schema
updates")
+ return nil, queryErr
+ }
+ nameSet := make(map[string]struct{}, len(results))
+ for _, result := range results {
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
+ s.metrics.totalErr.Inc(1, "aggregate")
+ return nil, unmarshalErr
+ }
+ if p.Metadata != nil && p.Metadata.Name != "" {
+ nameSet[p.Metadata.Name] = struct{}{}
+ }
+ }
+ names := make([]string, 0, len(nameSet))
+ for name := range nameSet {
+ names = append(names, name)
+ }
+ return &schemav1.AggregateSchemaUpdatesResponse{Names: names}, nil
+}
+
+func errInvalidRequest(msg string) error {
+ return &invalidRequestError{msg: msg}
+}
+
+type invalidRequestError struct {
+ msg string
+}
+
+func (e *invalidRequestError) Error() string {
+ return "invalid request: " + e.msg
+}
Review Comment:
Request validation errors are returned as plain Go errors (e.g., `invalid
request: ...`), which will surface to gRPC clients as `codes.Unknown`. Convert
these to `status.Error(codes.InvalidArgument, ...)` (or map this custom error
type in an interceptor) so clients can reliably distinguish bad input from
server failures.
##########
banyand/metadata/schema/propertyserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyserver implements a standalone gRPC server for schema
property management.
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "path"
+ "path/filepath"
+ "runtime/debug"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+ grpc_validator
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+ "github.com/pkg/errors"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+ schemaGroup = "_schema"
+ defaultShardID = common.ShardID(0)
+ defaultRecvSize = 10 << 20
+)
+
+var (
+ _ run.PreRunner = (*server)(nil)
+ _ run.Config = (*server)(nil)
+ _ run.Service = (*server)(nil)
+)
+
+// Server is the interface for the standalone property server.
+type Server interface {
+ run.PreRunner
+ run.Config
+ run.Service
+ GetPort() *uint32
+}
+
+type server struct {
+ db db.Database
+ lfs fs.FileSystem
+ omr observability.MetricsRegistry
+ closer *run.Closer
+ l *logger.Logger
+ ser *grpclib.Server
+ tlsReloader *pkgtls.Reloader
+ schemaService *schemaManagementServer
+ updateService *schemaUpdateServer
+ host string
+ certFile string
+ root string
+ keyFile string
+ addr string
+ repairBuildTreeCron string
+ minFileSnapshotAge time.Duration
+ flushTimeout time.Duration
+ snapshotSeq uint64
+ expireTimeout time.Duration
+ repairQuickBuildTreeTime time.Duration
+ maxRecvMsgSize run.Bytes
+ maxFileSnapshotNum int
+ repairTreeSlotCount int
+ snapshotMu sync.Mutex
+ port uint32
+ tls bool
+}
+
+// NewServer returns a new standalone property server.
+func NewServer(omr observability.MetricsRegistry) Server {
+ return &server{
+ omr: omr,
+ closer: run.NewCloser(0),
+ }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+ return &s.port
+}
+
+func (s *server) Name() string {
+ return "property-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("schema-property-server")
+ s.maxRecvMsgSize = defaultRecvSize
+ flagS.StringVar(&s.root, "schema-property-server-root-path", "/tmp",
"root storage path")
+ flagS.StringVar(&s.host, "schema-property-server-grpc-host", "", "the
host of schema property server")
+ flagS.Uint32Var(&s.port, "schema-property-server-grpc-port", 17916,
"the port of schema property server")
+ flagS.DurationVar(&s.flushTimeout,
"schema-property-server-flush-timeout", 5*time.Second, "memory flush interval")
+ flagS.DurationVar(&s.expireTimeout,
"schema-property-server-expire-delete-timeout", time.Hour*24*7, "soft-delete
expiration")
+ flagS.BoolVar(&s.tls, "schema-property-server-tls", false, "connection
uses TLS if true")
+ flagS.StringVar(&s.certFile, "schema-property-server-cert-file", "",
"the TLS cert file")
+ flagS.StringVar(&s.keyFile, "schema-property-server-key-file", "", "the
TLS key file")
+ flagS.VarP(&s.maxRecvMsgSize,
"schema-property-server-max-recv-msg-size", "", "max gRPC receive message size")
+ flagS.IntVar(&s.repairTreeSlotCount,
"schema-property-server-repair-tree-slot-count", 32, "repair tree slot count")
+ flagS.StringVar(&s.repairBuildTreeCron,
"schema-property-server-repair-build-tree-cron", "@every 1h",
+ "cron for repair tree building")
+ flagS.DurationVar(&s.repairQuickBuildTreeTime,
"schema-property-server-repair-quick-build-tree-time",
+ s.repairQuickBuildTreeTime, "schema-quick build tree duration")
+ flagS.IntVar(&s.maxFileSnapshotNum,
"schema-property-server-max-file-snapshot-num", 10, "the maximum number of file
snapshots allowed")
+ flagS.DurationVar(&s.minFileSnapshotAge,
"schema-property-server-min-file-snapshot-age", time.Hour, "the minimum age for
file snapshots to be eligible for deletion")
+ return flagS
+}
+
+func (s *server) Validate() error {
+ if s.root == "" {
+ return errors.New("root path must not be empty")
+ }
+ if s.port == 0 {
+ s.port = 17920
+ }
+ s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port),
10))
+ if s.addr == ":" {
+ return errors.New("no address")
+ }
+ if !s.tls {
+ return nil
+ }
+ if s.certFile == "" {
+ return errors.New("invalid server cert file")
+ }
+ if s.keyFile == "" {
+ return errors.New("invalid server key file")
+ }
+ return nil
+}
+
+func (s *server) PreRun(_ context.Context) error {
+ s.l = logger.GetLogger("schema-property-server")
+ s.lfs = fs.NewLocalFileSystem()
+
+ grpcFactory := s.omr.With(propertyServerScope.SubScope("grpc"))
+ sm := newServerMetrics(grpcFactory)
+ s.schemaService = &schemaManagementServer{
+ server: s,
+ l: s.l,
+ metrics: sm,
+ }
+ s.updateService = &schemaUpdateServer{
+ server: s,
+ l: s.l,
+ metrics: sm,
+ }
+
+ if s.tls {
+ var tlsErr error
+ s.tlsReloader, tlsErr = pkgtls.NewReloader(s.certFile,
s.keyFile, s.l)
+ if tlsErr != nil {
+ return errors.Wrap(tlsErr, "failed to initialize TLS
reloader for property server")
+ }
+ }
+
+ dataDir := filepath.Join(s.root, "schema-property", "data")
+ snapshotDir := filepath.Join(s.root, "schema-property", "snapshots")
+ repairDir := filepath.Join(s.root, "schema-property", "repair")
+
+ cfg := db.Config{
+ Location: dataDir,
+ FlushInterval: s.flushTimeout,
+ ExpireToDeleteDuration: s.expireTimeout,
+ Repair: db.RepairConfig{
+ Enabled: true,
+ Location: repairDir,
+ BuildTreeCron: s.repairBuildTreeCron,
+ QuickBuildTreeTime: s.repairQuickBuildTreeTime,
+ TreeSlotCount: s.repairTreeSlotCount,
+ },
+ Index: db.IndexConfig{
+ BatchWaitSec: 5,
+ WaitForPersistence: false,
+ },
+ Snapshot: db.SnapshotConfig{
+ Location: snapshotDir,
+ Func: func(ctx context.Context) (string, error) {
+ s.snapshotMu.Lock()
+ defer s.snapshotMu.Unlock()
+ storage.DeleteStaleSnapshots(snapshotDir,
s.maxFileSnapshotNum, s.minFileSnapshotAge, s.lfs)
+ sn := s.snapshotName()
+ snapshot := s.db.TakeSnapShot(ctx, sn)
+ if snapshot.Error != "" {
+ return "", fmt.Errorf("failed to find
snapshot %s: %s", sn, snapshot.Error)
+ }
+ return path.Join(snapshotDir, sn,
storage.DataDir), nil
+ },
+ },
+ }
+
+ var openErr error
+ // nolint:contextcheck
+ s.db, openErr = db.OpenDB(s.closer.Ctx(), cfg, s.omr, s.lfs)
+ if openErr != nil {
+ return errors.Wrap(openErr, "failed to open property database")
+ }
+ s.l.Info().Str("root", s.root).Msg("property database initialized")
+ return nil
+}
+
+func (s *server) snapshotName() string {
+ s.snapshotSeq++
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
+}
+
+func (s *server) Serve() run.StopNotify {
+ var opts []grpclib.ServerOption
+ if s.tls {
+ if s.tlsReloader != nil {
+ if startErr := s.tlsReloader.Start(); startErr != nil {
+ s.l.Error().Err(startErr).Msg("Failed to start
TLS reloader for property server")
+ return s.closer.CloseNotify()
+ }
+ s.l.Info().Str("certFile", s.certFile).Str("keyFile",
s.keyFile).
+ Msg("Started TLS file monitoring for property
server")
+ tlsConfig := s.tlsReloader.GetTLSConfig()
+ creds := credentials.NewTLS(tlsConfig)
+ opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+ } else {
+ creds, tlsErr :=
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+ if tlsErr != nil {
+ s.l.Error().Err(tlsErr).Msg("Failed to load TLS
credentials")
+ return s.closer.CloseNotify()
+ }
+ opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+ }
+ }
+ grpcPanicRecoveryHandler := func(p any) (err error) {
+ s.l.Error().Interface("panic", p).Str("stack",
string(debug.Stack())).Msg("recovered from panic")
+ return status.Errorf(codes.Internal, "%s", p)
+ }
+ streamChain := []grpclib.StreamServerInterceptor{
+
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+ }
+ unaryChain := []grpclib.UnaryServerInterceptor{
+ grpc_validator.UnaryServerInterceptor(),
+
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+ }
+ opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
+ grpclib.ChainUnaryInterceptor(unaryChain...),
+ grpclib.ChainStreamInterceptor(streamChain...),
+ )
+ s.ser = grpclib.NewServer(opts...)
+ schemav1.RegisterSchemaManagementServiceServer(s.ser, s.schemaService)
+ schemav1.RegisterSchemaUpdateServiceServer(s.ser, s.updateService)
+ grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
+
+ s.closer.AddRunning()
+ go func() {
+ defer s.closer.Done()
+ lis, lisErr := net.Listen("tcp", s.addr)
+ if lisErr != nil {
+ s.l.Error().Err(lisErr).Msg("Failed to listen")
Review Comment:
If `net.Listen` fails, `Serve()` logs and returns without closing the
server's `Closer`. Since `Serve()` returns `s.closer.CloseNotify()`, callers
may block forever waiting for shutdown. Close the closer (and/or return a stop
notify that is closed) when the listener cannot be created.
```suggestion
s.l.Error().Err(lisErr).Msg("Failed to listen")
s.closer.Close()
```
##########
banyand/metadata/schema/propertyserver/service.go:
##########
@@ -0,0 +1,322 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyserver implements a standalone gRPC server for schema
property management.
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "path"
+ "path/filepath"
+ "runtime/debug"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+ grpc_validator
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+ "github.com/pkg/errors"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+ schemaGroup = "_schema"
+ defaultShardID = common.ShardID(0)
+ defaultRecvSize = 10 << 20
+)
+
+var (
+ _ run.PreRunner = (*server)(nil)
+ _ run.Config = (*server)(nil)
+ _ run.Service = (*server)(nil)
+)
+
+// Server is the interface for the standalone property server.
+type Server interface {
+ run.PreRunner
+ run.Config
+ run.Service
+ GetPort() *uint32
+}
+
+type server struct {
+ db db.Database
+ lfs fs.FileSystem
+ omr observability.MetricsRegistry
+ closer *run.Closer
+ l *logger.Logger
+ ser *grpclib.Server
+ tlsReloader *pkgtls.Reloader
+ schemaService *schemaManagementServer
+ updateService *schemaUpdateServer
+ host string
+ certFile string
+ root string
+ keyFile string
+ addr string
+ repairBuildTreeCron string
+ minFileSnapshotAge time.Duration
+ flushTimeout time.Duration
+ snapshotSeq uint64
+ expireTimeout time.Duration
+ repairQuickBuildTreeTime time.Duration
+ maxRecvMsgSize run.Bytes
+ maxFileSnapshotNum int
+ repairTreeSlotCount int
+ snapshotMu sync.Mutex
+ port uint32
+ tls bool
+}
+
+// NewServer returns a new standalone property server.
+func NewServer(omr observability.MetricsRegistry) Server {
+ return &server{
+ omr: omr,
+ closer: run.NewCloser(0),
+ }
+}
+
+// GetPort returns the gRPC server port.
+func (s *server) GetPort() *uint32 {
+ return &s.port
+}
+
+func (s *server) Name() string {
+ return "property-server"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("schema-property-server")
+ s.maxRecvMsgSize = defaultRecvSize
+ flagS.StringVar(&s.root, "schema-property-server-root-path", "/tmp",
"root storage path")
+ flagS.StringVar(&s.host, "schema-property-server-grpc-host", "", "the
host of schema property server")
+ flagS.Uint32Var(&s.port, "schema-property-server-grpc-port", 17916,
"the port of schema property server")
+ flagS.DurationVar(&s.flushTimeout,
"schema-property-server-flush-timeout", 5*time.Second, "memory flush interval")
+ flagS.DurationVar(&s.expireTimeout,
"schema-property-server-expire-delete-timeout", time.Hour*24*7, "soft-delete
expiration")
+ flagS.BoolVar(&s.tls, "schema-property-server-tls", false, "connection
uses TLS if true")
+ flagS.StringVar(&s.certFile, "schema-property-server-cert-file", "",
"the TLS cert file")
+ flagS.StringVar(&s.keyFile, "schema-property-server-key-file", "", "the
TLS key file")
+ flagS.VarP(&s.maxRecvMsgSize,
"schema-property-server-max-recv-msg-size", "", "max gRPC receive message size")
+ flagS.IntVar(&s.repairTreeSlotCount,
"schema-property-server-repair-tree-slot-count", 32, "repair tree slot count")
+ flagS.StringVar(&s.repairBuildTreeCron,
"schema-property-server-repair-build-tree-cron", "@every 1h",
+ "cron for repair tree building")
+ flagS.DurationVar(&s.repairQuickBuildTreeTime,
"schema-property-server-repair-quick-build-tree-time",
+ s.repairQuickBuildTreeTime, "schema-quick build tree duration")
+ flagS.IntVar(&s.maxFileSnapshotNum,
"schema-property-server-max-file-snapshot-num", 10, "the maximum number of file
snapshots allowed")
Review Comment:
`schema-property-server-repair-quick-build-tree-time` is registered with a
default of `s.repairQuickBuildTreeTime`, which is 0 unless set elsewhere. With
a 0 duration, repair build-tree can be triggered immediately on the first
update, which differs from the property service default (10m) and may cause
unexpected load. Set an explicit non-zero default here (or document why 0 is
desired).
##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+ return &serverMetrics{
+ totalStarted: factory.NewCounter("total_started", "method"),
+ totalFinished: factory.NewCounter("total_finished", "method"),
+ totalErr: factory.NewCounter("total_err", "method"),
+ totalLatency: factory.NewCounter("total_latency", "method"),
+ }
+}
+
+type schemaManagementServer struct {
+ schemav1.UnimplementedSchemaManagementServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "insert")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "insert")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"insert")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ existQuery := &propertyv1.QueryRequest{
Review Comment:
The server persists schema `Property` records without setting
`Metadata.CreateRevision`/`Metadata.ModRevision` (and `UpdatedAt`). Since the
property DB uses `Metadata.ModRevision` as the document timestamp and as part
of `GetPropertyID`, leaving it at 0 makes all revisions indistinguishable and
undermines update semantics. Consider assigning `CreateRevision` on first
insert, updating `ModRevision` (and `UpdatedAt`) on every insert/update/repair,
similar to the existing property Apply flow.
##########
banyand/property/scheduler.go:
##########
@@ -0,0 +1,113 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+ "context"
+ "fmt"
+ "math/rand/v2"
+ "time"
+
+ "github.com/robfig/cron/v3"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+type repairScheduler struct {
+ metadata metadata.Repo
+ gossipMessenger gossip.Messenger
+ scheduler *timestamp.Scheduler
+}
+
+func newRepairScheduler(ctx context.Context, l *logger.Logger, cronExp string,
metadata metadata.Repo, messenger gossip.Messenger) (*repairScheduler, error) {
+ r := &repairScheduler{
+ metadata: metadata,
+ gossipMessenger: messenger,
+ scheduler: timestamp.NewScheduler(l,
timestamp.NewClock()),
+ }
+ err := r.scheduler.Register("trigger",
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+ cronExp, func(time.Time, *logger.Logger) bool {
+ l.Debug().Msgf("starting background repair gossip")
+ group, shardNum, nodes, gossipErr :=
r.doRepairGossip(ctx)
+ if gossipErr != nil {
+ l.Err(gossipErr).Msg("failed to repair gossip")
+ return true
+ }
+ l.Info().Str("group", group).Uint32("shardNum",
shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled")
+ return true
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to add repair trigger cron task:
%w", err)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("failed to add repair build tree cron
task: %w", err)
+ }
Review Comment:
`newRepairScheduler` checks `err` twice (the second `if err != nil` is
unreachable and returns the wrong message). Remove the duplicate check; if
there was meant to be a second cron registration, add it explicitly and check
its returned error instead.
```suggestion
```
##########
api/proto/banyandb/schema/v1/internal.proto:
##########
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.schema.v1;
+
+import "banyandb/property/v1/property.proto";
+import "banyandb/property/v1/rpc.proto";
+import "google/protobuf/timestamp.proto";
+
+option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1";
+option java_package = "org.apache.skywalking.banyandb.schema.v1";
+
+message InsertSchemaRequest {
+ property.v1.Property property = 1;
+}
+
+message InsertSchemaResponse {}
+
+message UpdateSchemaRequest {
+ property.v1.Property property = 1;
+}
+
+message UpdateSchemaResponse {}
+
+message ListSchemasRequest {
+ property.v1.QueryRequest query = 1;
+}
+
+message ListSchemasResponse {
+ repeated property.v1.Property properties = 1;
+ // delete_times maps to properties in the same order.
+ // 0 means not deleted, >0 means deletion timestamp.
+ repeated int64 delete_times = 2;
+}
+
+message GetSchemaRequest {
+ property.v1.QueryRequest query = 1;
+}
+
+message GetSchemaResponse {
+ property.v1.Property properties = 1;
Review Comment:
`GetSchemaResponse` defines a singular `Property` field but names it
`properties`. This is confusing and will be part of the public proto contract
once released. Rename the field to `property` (or make it `repeated` if
multiple are intended) before consumers depend on it.
```suggestion
property.v1.Property property = 1;
```
##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+ return &serverMetrics{
+ totalStarted: factory.NewCounter("total_started", "method"),
+ totalFinished: factory.NewCounter("total_finished", "method"),
+ totalErr: factory.NewCounter("total_err", "method"),
+ totalLatency: factory.NewCounter("total_latency", "method"),
+ }
+}
+
+type schemaManagementServer struct {
+ schemav1.UnimplementedSchemaManagementServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "insert")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "insert")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"insert")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ existQuery := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Property.Metadata.Name,
+ Ids: []string{req.Property.Id},
+ Limit: 1,
+ }
+ existing, queryErr := s.server.db.Query(ctx, existQuery)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ for _, result := range existing {
+ if result.DeleteTime() == 0 {
+ s.metrics.totalErr.Inc(1, "insert")
+ return nil, fmt.Errorf("schema already exists: name=%s,
id=%s", req.Property.Metadata.Name, req.Property.Id)
+ }
+ }
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(updateErr).Msg("failed to insert schema")
+ return nil, updateErr
+ }
+ return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "update")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "update")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"update")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "update")
+ s.l.Error().Err(updateErr).Msg("failed to update schema")
+ return nil, updateErr
+ }
+ return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+ stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+ if req.Query == nil {
+ return errInvalidRequest("query is required")
+ }
+ s.metrics.totalStarted.Inc(1, "list")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "list")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+ }()
+ req.Query.Groups = []string{schemaGroup}
+ results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ s.l.Error().Err(queryErr).Msg("failed to list schemas")
+ return queryErr
+ }
+ for batchStart := 0; batchStart < len(results); batchStart +=
listSchemasBatchSize {
+ batchEnd := batchStart + listSchemasBatchSize
+ if batchEnd > len(results) {
+ batchEnd = len(results)
+ }
+ batch := results[batchStart:batchEnd]
+ props := make([]*propertyv1.Property, 0, len(batch))
+ deleteTimes := make([]int64, 0, len(batch))
+ for _, result := range batch {
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(),
&p); unmarshalErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return unmarshalErr
+ }
+ props = append(props, &p)
+ deleteTimes = append(deleteTimes, result.DeleteTime())
+ }
+ if sendErr :=
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes:
deleteTimes}); sendErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return sendErr
+ }
+ }
+ return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+ if req.Delete == nil {
+ return nil, errInvalidRequest("delete request is required")
+ }
+ s.metrics.totalStarted.Inc(1, "delete")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "delete")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"delete")
+ }()
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Delete.Name,
+ }
+ if req.Delete.Id != "" {
+ query.Ids = []string{req.Delete.Id}
+ }
+ results, queryErr := s.server.db.Query(ctx, query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "delete")
+ s.l.Error().Err(queryErr).Msg("failed to delete schema")
+ return nil, queryErr
+ }
+ if len(results) == 0 {
+ return &schemav1.DeleteSchemaResponse{Found: false}, nil
+ }
+ ids := make([][]byte, 0, len(results))
+ for _, result := range results {
+ if result.DeleteTime() > 0 {
+ continue
+ }
+ ids = append(ids, result.ID())
+ }
+ if len(ids) == 0 {
+ return &schemav1.DeleteSchemaResponse{Found: false}, nil
+ }
+ if deleteErr := s.server.db.Delete(ctx, ids); deleteErr != nil {
+ s.metrics.totalErr.Inc(1, "delete")
+ s.l.Error().Err(deleteErr).Msg("failed to delete schema")
+ return nil, deleteErr
+ }
+ return &schemav1.DeleteSchemaResponse{Found: true}, nil
+}
+
+// RepairSchema repairs a schema property with the specified delete time.
+func (s *schemaManagementServer) RepairSchema(ctx context.Context, req
*schemav1.RepairSchemaRequest) (*schemav1.RepairSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "repair")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "repair")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"repair")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ id := db.GetPropertyID(req.Property)
+ if repairErr := s.server.db.Repair(ctx, id, uint64(defaultShardID),
req.Property, req.DeleteTime); repairErr != nil {
+ s.metrics.totalErr.Inc(1, "repair")
+ s.l.Error().Err(repairErr).Msg("failed to repair schema")
+ return nil, repairErr
+ }
+ return &schemav1.RepairSchemaResponse{}, nil
+}
+
+// ExistSchema checks if a schema property exists.
+func (s *schemaManagementServer) ExistSchema(ctx context.Context, req
*schemav1.ExistSchemaRequest) (*schemav1.ExistSchemaResponse, error) {
+ if req.Query == nil {
+ return nil, errInvalidRequest("query is required")
+ }
+ s.metrics.totalStarted.Inc(1, "exist")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "exist")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "exist")
+ }()
+ req.Query.Groups = []string{schemaGroup}
+ req.Query.Limit = 1
+ results, queryErr := s.server.db.Query(ctx, req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "exist")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ return &schemav1.ExistSchemaResponse{HasSchema: len(results) > 0}, nil
Review Comment:
`ExistSchema` sets `Limit = 1` and returns `HasSchema: len(results) > 0`,
which can incorrectly report existence for soft-deleted schemas and can miss
active schemas if the single returned row happens to be a deleted revision.
Scan results for at least one row with `DeleteTime()==0` and avoid `Limit=1`
unless you can guarantee ordering that prefers non-deleted/latest revisions.
```suggestion
results, queryErr := s.server.db.Query(ctx, req.Query)
if queryErr != nil {
s.metrics.totalErr.Inc(1, "exist")
s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
return nil, queryErr
}
hasSchema := false
for _, result := range results {
var p propertyv1.Property
if unmarshalErr := protojson.Unmarshal(result.Source(), &p);
unmarshalErr != nil {
s.metrics.totalErr.Inc(1, "exist")
return nil, unmarshalErr
}
// Consider the schema as existing only if it is not
soft-deleted.
if p.GetStatus().GetDeleteTime() == 0 {
hasSchema = true
break
}
}
return &schemav1.ExistSchemaResponse{HasSchema: hasSchema}, nil
```
##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+ return &serverMetrics{
+ totalStarted: factory.NewCounter("total_started", "method"),
+ totalFinished: factory.NewCounter("total_finished", "method"),
+ totalErr: factory.NewCounter("total_err", "method"),
+ totalLatency: factory.NewCounter("total_latency", "method"),
+ }
+}
+
+type schemaManagementServer struct {
+ schemav1.UnimplementedSchemaManagementServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "insert")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "insert")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"insert")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ existQuery := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Property.Metadata.Name,
+ Ids: []string{req.Property.Id},
+ Limit: 1,
+ }
+ existing, queryErr := s.server.db.Query(ctx, existQuery)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ for _, result := range existing {
+ if result.DeleteTime() == 0 {
+ s.metrics.totalErr.Inc(1, "insert")
+ return nil, fmt.Errorf("schema already exists: name=%s,
id=%s", req.Property.Metadata.Name, req.Property.Id)
+ }
+ }
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(updateErr).Msg("failed to insert schema")
+ return nil, updateErr
+ }
+ return &schemav1.InsertSchemaResponse{}, nil
+}
+
+// UpdateSchema updates an existing schema property.
+func (s *schemaManagementServer) UpdateSchema(ctx context.Context, req
*schemav1.UpdateSchemaRequest) (*schemav1.UpdateSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "update")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "update")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"update")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ id := db.GetPropertyID(req.Property)
+ if updateErr := s.server.db.Update(ctx, defaultShardID, id,
req.Property); updateErr != nil {
+ s.metrics.totalErr.Inc(1, "update")
+ s.l.Error().Err(updateErr).Msg("failed to update schema")
+ return nil, updateErr
+ }
+ return &schemav1.UpdateSchemaResponse{}, nil
+}
+
+const listSchemasBatchSize = 100
+
+// ListSchemas lists schema properties via server streaming.
+func (s *schemaManagementServer) ListSchemas(req *schemav1.ListSchemasRequest,
+ stream grpc.ServerStreamingServer[schemav1.ListSchemasResponse],
+) error {
+ if req.Query == nil {
+ return errInvalidRequest("query is required")
+ }
+ s.metrics.totalStarted.Inc(1, "list")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "list")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(), "list")
+ }()
+ req.Query.Groups = []string{schemaGroup}
+ results, queryErr := s.server.db.Query(stream.Context(), req.Query)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ s.l.Error().Err(queryErr).Msg("failed to list schemas")
+ return queryErr
+ }
+ for batchStart := 0; batchStart < len(results); batchStart +=
listSchemasBatchSize {
+ batchEnd := batchStart + listSchemasBatchSize
+ if batchEnd > len(results) {
+ batchEnd = len(results)
+ }
+ batch := results[batchStart:batchEnd]
+ props := make([]*propertyv1.Property, 0, len(batch))
+ deleteTimes := make([]int64, 0, len(batch))
+ for _, result := range batch {
+ var p propertyv1.Property
+ if unmarshalErr := protojson.Unmarshal(result.Source(),
&p); unmarshalErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return unmarshalErr
+ }
+ props = append(props, &p)
+ deleteTimes = append(deleteTimes, result.DeleteTime())
+ }
+ if sendErr :=
stream.Send(&schemav1.ListSchemasResponse{Properties: props, DeleteTimes:
deleteTimes}); sendErr != nil {
+ s.metrics.totalErr.Inc(1, "list")
+ return sendErr
+ }
+ }
+ return nil
+}
+
+// DeleteSchema deletes a schema property.
+func (s *schemaManagementServer) DeleteSchema(ctx context.Context, req
*schemav1.DeleteSchemaRequest) (*schemav1.DeleteSchemaResponse, error) {
+ if req.Delete == nil {
+ return nil, errInvalidRequest("delete request is required")
+ }
+ s.metrics.totalStarted.Inc(1, "delete")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "delete")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"delete")
+ }()
+ query := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Delete.Name,
+ }
+ if req.Delete.Id != "" {
+ query.Ids = []string{req.Delete.Id}
+ }
+ results, queryErr := s.server.db.Query(ctx, query)
+ if queryErr != nil {
Review Comment:
`DeleteSchemaRequest` includes `update_at` (comment says it's for
notification), but `DeleteSchema` currently ignores it and does not update the
stored record's revision/time accordingly. If downstream consumers rely on this
for change notification, wire `update_at` into the delete marker (e.g., set
`ModRevision`/`UpdatedAt` when marking deleted).
##########
banyand/metadata/schema/propertyserver/grpc.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package propertyserver
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/db"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var propertyServerScope = observability.RootScope.SubScope("schema_property")
+
+type serverMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+}
+
+func newServerMetrics(factory observability.Factory) *serverMetrics {
+ return &serverMetrics{
+ totalStarted: factory.NewCounter("total_started", "method"),
+ totalFinished: factory.NewCounter("total_finished", "method"),
+ totalErr: factory.NewCounter("total_err", "method"),
+ totalLatency: factory.NewCounter("total_latency", "method"),
+ }
+}
+
+type schemaManagementServer struct {
+ schemav1.UnimplementedSchemaManagementServiceServer
+ server *server
+ l *logger.Logger
+ metrics *serverMetrics
+}
+
+// InsertSchema inserts a new schema property.
+func (s *schemaManagementServer) InsertSchema(ctx context.Context, req
*schemav1.InsertSchemaRequest) (*schemav1.InsertSchemaResponse, error) {
+ if req.Property == nil {
+ return nil, errInvalidRequest("property is required")
+ }
+ if req.Property.Metadata == nil {
+ return nil, errInvalidRequest("metadata should not be nil")
+ }
+ s.metrics.totalStarted.Inc(1, "insert")
+ start := time.Now()
+ defer func() {
+ s.metrics.totalFinished.Inc(1, "insert")
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
"insert")
+ }()
+ req.Property.Metadata.Group = schemaGroup
+ existQuery := &propertyv1.QueryRequest{
+ Groups: []string{schemaGroup},
+ Name: req.Property.Metadata.Name,
+ Ids: []string{req.Property.Id},
+ Limit: 1,
+ }
+ existing, queryErr := s.server.db.Query(ctx, existQuery)
+ if queryErr != nil {
+ s.metrics.totalErr.Inc(1, "insert")
+ s.l.Error().Err(queryErr).Msg("failed to check schema
existence")
+ return nil, queryErr
+ }
+ for _, result := range existing {
+ if result.DeleteTime() == 0 {
+ s.metrics.totalErr.Inc(1, "insert")
+ return nil, fmt.Errorf("schema already exists: name=%s,
id=%s", req.Property.Metadata.Name, req.Property.Id)
+ }
Review Comment:
`InsertSchema` checks for duplicates with `Limit: 1`. The underlying
`Search` results are not guaranteed to be ordered (callers sort explicitly
elsewhere), so a single returned row could be a deleted revision while an
active (non-deleted) revision exists, allowing duplicates. Query without such a
limit (or ensure deterministic ordering) and treat the schema as existing if
*any* matching row has `DeleteTime()==0`. Also consider returning a gRPC
`AlreadyExists` status rather than a plain `fmt.Errorf`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]