hanahmily commented on code in PR #962: URL: https://github.com/apache/skywalking-banyandb/pull/962#discussion_r2753686014
########## banyand/metadata/schema/property/client.go: ########## @@ -0,0 +1,1388 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var errNoMetadataServers = errors.New("no metadata servers available") + +type clientMetrics struct { + totalSyncStarted meter.Counter + totalSyncFinished meter.Counter + totalSyncErr meter.Counter + totalSyncLatency meter.Counter + totalSyncUpdates meter.Counter + + totalQueryStarted meter.Counter + totalQueryFinished meter.Counter + totalQueryErr meter.Counter + totalQueryLatency meter.Counter + + totalRepairStarted meter.Counter + totalRepairFinished meter.Counter + totalRepairNodes meter.Counter + + totalWriteStarted meter.Counter + totalWriteFinished meter.Counter + totalWriteErr meter.Counter + totalWriteLatency meter.Counter + + totalDeleteStarted meter.Counter + totalDeleteFinished meter.Counter + totalDeleteErr meter.Counter + totalDeleteLatency meter.Counter + + cacheSize meter.Gauge +} + +func newClientMetrics(factory observability.Factory) *clientMetrics { + return &clientMetrics{ + totalSyncStarted: factory.NewCounter("property_sync_started"), + totalSyncFinished: factory.NewCounter("property_sync_finished"), + totalSyncErr: factory.NewCounter("property_sync_err"), + totalSyncLatency: factory.NewCounter("property_sync_latency"), + totalSyncUpdates: factory.NewCounter("property_sync_updates"), + + totalQueryStarted: factory.NewCounter("property_query_started", "method"), + totalQueryFinished: factory.NewCounter("property_query_finished", "method"), + totalQueryErr: factory.NewCounter("property_query_err", "method"), + totalQueryLatency: factory.NewCounter("property_query_latency", "method"), + + totalRepairStarted: factory.NewCounter("property_repair_started"), + totalRepairFinished: factory.NewCounter("property_repair_finished"), + totalRepairNodes: factory.NewCounter("property_repair_nodes"), + + totalWriteStarted: factory.NewCounter("property_write_started", "kind", "group"), + totalWriteFinished: factory.NewCounter("property_write_finished", "kind", "group"), + totalWriteErr: factory.NewCounter("property_write_err", "kind", "group"), + totalWriteLatency: factory.NewCounter("property_write_latency", "kind", "group"), + + totalDeleteStarted: factory.NewCounter("property_delete_started", "kind", "group"), + totalDeleteFinished: factory.NewCounter("property_delete_finished", "kind", "group"), + totalDeleteErr: factory.NewCounter("property_delete_err", "kind", "group"), + totalDeleteLatency: factory.NewCounter("property_delete_latency", "kind", "group"), + + cacheSize: factory.NewGauge("property_cache_size"), + } +} + +// nodeConnection represents a connection to a metadata node. +type nodeConnection struct { + mgrClient schemav1.SchemaManagementServiceClient + updateClient schemav1.SchemaUpdateServiceClient + conn *grpc.ClientConn +} + +// SchemaRegistry implements schema.Registry interface using property-based storage. +type SchemaRegistry struct { + dialOptsPrv common.GRPCDialOptionsProvider + handlers map[schema.Kind][]schema.EventHandler + nodeConns map[string]*nodeConnection + cache *schemaCache + closer *run.Closer + syncCloser *run.Closer + l *logger.Logger + metrics *clientMetrics + mu sync.RWMutex + connMu sync.RWMutex + grpcTimeout time.Duration + syncInterval time.Duration +} + +// NewSchemaRegistryClient creates a new property schema registry client. +// It accepts a NodeDiscovery service to dynamically discover and connect to metadata nodes. +func NewSchemaRegistryClient(dialOptsProvider common.GRPCDialOptionsProvider, grpcTimeout, syncInterval time.Duration) *SchemaRegistry { + r := &SchemaRegistry{ + dialOptsPrv: dialOptsProvider, + nodeConns: make(map[string]*nodeConnection), + handlers: make(map[schema.Kind][]schema.EventHandler), + cache: newSchemaCache(), + closer: run.NewCloser(1), + l: logger.GetLogger("property-schema-registry"), + grpcTimeout: grpcTimeout, + syncInterval: syncInterval, + } + return r +} + +// SetMetrics initializes the client metrics using the provided metrics registry. +func (r *SchemaRegistry) SetMetrics(omr observability.MetricsRegistry) { + if omr == nil { + return + } + clientScope := metadataScope.SubScope("client") + r.metrics = newClientMetrics(omr.With(clientScope)) +} + +// OnInit implements schema.EventHandler. +func (r *SchemaRegistry) OnInit(_ []schema.Kind) (bool, []int64) { + return false, nil +} + +// OnAddOrUpdate implements schema.EventHandler for getting the all metadata nodes. +func (r *SchemaRegistry) OnAddOrUpdate(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + containsMetadata := false + for _, role := range node.Roles { + if role == databasev1.Role_ROLE_META { + containsMetadata = true + break + } + } + if !containsMetadata { + return + } + r.addNodeConnection(node) +} + +// OnDelete implements schema.EventHandler for getting which metadata node has been deleted. +func (r *SchemaRegistry) OnDelete(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + r.removeNodeConnection(node.GetGrpcAddress()) +} + +func (r *SchemaRegistry) addNodeConnection(node *databasev1.Node) { + address := node.GetGrpcAddress() + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + if _, exists := r.nodeConns[address]; exists { Review Comment: ## Extract `pub/client.go` resiliency into a shared component `banyand/queue/pub/client.go` has a solid set of **connection-management + resiliency** features (retry policy, health check, eviction/reconnect, shutdown-cancellable goroutines, probe dedupe). `banyand/metadata/schema/property/client.go` needs the same primitives, but re-implementing them twice will diverge quickly. I recommend extracting these concerns into a reusable package and letting both `pub` and `property` compose it. ### What to extract (shared “node gRPC pool” responsibilities) - **Dial + config**: apply common `grpc.DialOption`s (TLS creds, max recv size, default service config retry policy). - **Health gate**: a health check (`grpc_health_v1` and/or service-specific HealthCheck). - **Evict/reconnect loop**: on unhealthy detection or RPC failures, close conn → move node to “evictable” → reconnect with jittered backoff → restore to “active”. - **Lifecycle management**: all background goroutines must be tied to a `run.Closer` and stop on `CloseNotify()`. - **Deduped probes**: prevent concurrent reconnect/probe storms for the same node. ########## banyand/metadata/client/client.go: ########## @@ -157,6 +171,11 @@ func (s *clientService) FlagSet() *run.FlagSet { fs.Float64Var(&s.fileRetryMultiplier, "node-discovery-file-retry-multiplier", 2.0, "Backoff multiplier for retry intervals in file discovery mode") + // schema management configuration + fs.StringVar(&s.metadataRegistryMode, "schema-registry-mode", "etcd", Review Comment: ```suggestion fs.StringVar(&s.metadataRegistryMode, "schema-registry-mode", "property", ``` ########## banyand/metadata/client/client.go: ########## @@ -290,6 +271,64 @@ func (s *clientService) PreRun(ctx context.Context) error { } } + if s.metadataRegistryMode == RegistryModeEtcd { + for { + var err error + s.schemaRegistry, err = etcd.NewEtcdSchemaRegistry( + etcd.Namespace(s.namespace), + etcd.ConfigureServerEndpoints(s.endpoints), + etcd.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword), + etcd.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile), + etcd.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile), + etcd.ConfigureWatchCheckInterval(s.etcdFullSyncInterval), + ) + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + select { + case <-stopCh: + return errors.New("pre-run interrupted") + case <-time.After(s.registryTimeout): + return errors.New("pre-run timeout") + case <-s.closer.CloseNotify(): + return errors.New("pre-run interrupted") + default: + l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...") + time.Sleep(time.Second) + continue + } + } + if err == nil { + break + } + return err + } + } else if s.metadataRegistryMode == RegistryModeProperty { + if s.nodeDiscoveryMode == NodeDiscoveryModeEtcd { + return errors.New("property registry mode does not support etcd-based node discovery, please use DNS or file mode") Review Comment: Property registry mode should support any node discovery mode, including etcd, DNS, or file-based. ########## banyand/metadata/schema/property/client.go: ########## @@ -0,0 +1,1388 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var errNoMetadataServers = errors.New("no metadata servers available") + +type clientMetrics struct { + totalSyncStarted meter.Counter + totalSyncFinished meter.Counter + totalSyncErr meter.Counter + totalSyncLatency meter.Counter + totalSyncUpdates meter.Counter + + totalQueryStarted meter.Counter + totalQueryFinished meter.Counter + totalQueryErr meter.Counter + totalQueryLatency meter.Counter + + totalRepairStarted meter.Counter + totalRepairFinished meter.Counter + totalRepairNodes meter.Counter + + totalWriteStarted meter.Counter + totalWriteFinished meter.Counter + totalWriteErr meter.Counter + totalWriteLatency meter.Counter + + totalDeleteStarted meter.Counter + totalDeleteFinished meter.Counter + totalDeleteErr meter.Counter + totalDeleteLatency meter.Counter + + cacheSize meter.Gauge +} + +func newClientMetrics(factory observability.Factory) *clientMetrics { + return &clientMetrics{ + totalSyncStarted: factory.NewCounter("property_sync_started"), + totalSyncFinished: factory.NewCounter("property_sync_finished"), + totalSyncErr: factory.NewCounter("property_sync_err"), + totalSyncLatency: factory.NewCounter("property_sync_latency"), + totalSyncUpdates: factory.NewCounter("property_sync_updates"), + + totalQueryStarted: factory.NewCounter("property_query_started", "method"), + totalQueryFinished: factory.NewCounter("property_query_finished", "method"), + totalQueryErr: factory.NewCounter("property_query_err", "method"), + totalQueryLatency: factory.NewCounter("property_query_latency", "method"), + + totalRepairStarted: factory.NewCounter("property_repair_started"), + totalRepairFinished: factory.NewCounter("property_repair_finished"), + totalRepairNodes: factory.NewCounter("property_repair_nodes"), + + totalWriteStarted: factory.NewCounter("property_write_started", "kind", "group"), + totalWriteFinished: factory.NewCounter("property_write_finished", "kind", "group"), + totalWriteErr: factory.NewCounter("property_write_err", "kind", "group"), + totalWriteLatency: factory.NewCounter("property_write_latency", "kind", "group"), + + totalDeleteStarted: factory.NewCounter("property_delete_started", "kind", "group"), + totalDeleteFinished: factory.NewCounter("property_delete_finished", "kind", "group"), + totalDeleteErr: factory.NewCounter("property_delete_err", "kind", "group"), + totalDeleteLatency: factory.NewCounter("property_delete_latency", "kind", "group"), + + cacheSize: factory.NewGauge("property_cache_size"), + } +} + +// nodeConnection represents a connection to a metadata node. +type nodeConnection struct { + mgrClient schemav1.SchemaManagementServiceClient + updateClient schemav1.SchemaUpdateServiceClient + conn *grpc.ClientConn +} + +// SchemaRegistry implements schema.Registry interface using property-based storage. +type SchemaRegistry struct { + dialOptsPrv common.GRPCDialOptionsProvider + handlers map[schema.Kind][]schema.EventHandler + nodeConns map[string]*nodeConnection + cache *schemaCache + closer *run.Closer + syncCloser *run.Closer + l *logger.Logger + metrics *clientMetrics + mu sync.RWMutex + connMu sync.RWMutex + grpcTimeout time.Duration + syncInterval time.Duration +} + +// NewSchemaRegistryClient creates a new property schema registry client. +// It accepts a NodeDiscovery service to dynamically discover and connect to metadata nodes. +func NewSchemaRegistryClient(dialOptsProvider common.GRPCDialOptionsProvider, grpcTimeout, syncInterval time.Duration) *SchemaRegistry { + r := &SchemaRegistry{ + dialOptsPrv: dialOptsProvider, + nodeConns: make(map[string]*nodeConnection), + handlers: make(map[schema.Kind][]schema.EventHandler), + cache: newSchemaCache(), + closer: run.NewCloser(1), + l: logger.GetLogger("property-schema-registry"), + grpcTimeout: grpcTimeout, + syncInterval: syncInterval, + } + return r +} + +// SetMetrics initializes the client metrics using the provided metrics registry. +func (r *SchemaRegistry) SetMetrics(omr observability.MetricsRegistry) { + if omr == nil { + return + } + clientScope := metadataScope.SubScope("client") + r.metrics = newClientMetrics(omr.With(clientScope)) +} + +// OnInit implements schema.EventHandler. +func (r *SchemaRegistry) OnInit(_ []schema.Kind) (bool, []int64) { + return false, nil +} + +// OnAddOrUpdate implements schema.EventHandler for getting the all metadata nodes. +func (r *SchemaRegistry) OnAddOrUpdate(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + containsMetadata := false + for _, role := range node.Roles { + if role == databasev1.Role_ROLE_META { + containsMetadata = true + break + } + } + if !containsMetadata { + return + } + r.addNodeConnection(node) +} + +// OnDelete implements schema.EventHandler for getting which metadata node has been deleted. +func (r *SchemaRegistry) OnDelete(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + r.removeNodeConnection(node.GetGrpcAddress()) +} + +func (r *SchemaRegistry) addNodeConnection(node *databasev1.Node) { + address := node.GetGrpcAddress() + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + if _, exists := r.nodeConns[address]; exists { + return + } + var dialOpts []grpc.DialOption + if r.dialOptsPrv != nil { + var optsErr error + dialOpts, optsErr = r.dialOptsPrv.GetDialOptions(address) + if optsErr != nil { + r.l.Warn().Err(optsErr).Str("address", address).Msg("failed to get dial options") + return + } + } + conn, connErr := grpchelper.Conn(address, r.grpcTimeout, dialOpts...) + if connErr != nil { + r.l.Warn().Err(connErr).Str("address", address).Msg("failed to connect to metadata node") + return + } + schemaMgrClient := schemav1.NewSchemaManagementServiceClient(conn) + schemaUpdateClient := schemav1.NewSchemaUpdateServiceClient(conn) + nodeConn := &nodeConnection{ + mgrClient: schemaMgrClient, + updateClient: schemaUpdateClient, + conn: conn, + } + r.nodeConns[address] = nodeConn + r.l.Info().Str("address", address).Str("node", node.GetMetadata().GetName()).Msg("connected to metadata node") + go r.initializeFromNode(nodeConn) +} + +func (r *SchemaRegistry) removeNodeConnection(address string) { + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + nc, exists := r.nodeConns[address] + if !exists { + return + } + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", address).Msg("failed to close connection") + } + } + delete(r.nodeConns, address) + r.l.Info().Str("address", address).Msg("disconnected from metadata node") +} + +// Close closes the registry. +func (r *SchemaRegistry) Close() error { + r.closer.Done() + r.closer.CloseThenWait() + if r.syncCloser != nil { + r.syncCloser.Done() + r.syncCloser.CloseThenWait() + } + r.connMu.Lock() + defer r.connMu.Unlock() + for addr, nc := range r.nodeConns { + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", addr).Msg("failed to close connection") + } + } + } + r.nodeConns = make(map[string]*nodeConnection) + return nil +} + +// RegisterHandler registers an event handler for a schema kind. +func (r *SchemaRegistry) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) { + // Validate kind + if kind&schema.KindMask != kind { + panic(fmt.Sprintf("invalid kind %d", kind)) + } + r.mu.Lock() + defer r.mu.Unlock() + var kinds []schema.Kind + for i := 0; i < schema.KindSize; i++ { + ki := schema.Kind(1 << i) + if kind&ki > 0 { + kinds = append(kinds, ki) + } + } + r.l.Info().Str("name", name).Interface("kinds", kinds).Msg("registering handler") + handler.OnInit(kinds) + for _, ki := range kinds { + r.addHandler(ki, handler) + } +} + +// Register registers a metadata entry. +func (r *SchemaRegistry) Register(context.Context, schema.Metadata, bool) error { + panic("property based schema registry not support register") +} + +// Compact is not supported in property mode. +func (r *SchemaRegistry) Compact(context.Context, int64) error { + return nil +} + +// StartWatcher starts the global sync mechanism. +func (r *SchemaRegistry) StartWatcher() { + r.syncCloser = run.NewCloser(1) + go r.globalSync() +} + +// Stream methods. + +// GetStream retrieves a stream by metadata. +func (r *SchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error) { + return getResource[*databasev1.Stream](ctx, r, schema.KindStream, metadata.GetGroup(), metadata.GetName()) +} + +// ListStream lists streams in a group. +func (r *SchemaRegistry) ListStream(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Stream, error) { + return listResources[*databasev1.Stream](ctx, r, schema.KindStream, opt.Group, true) +} + +// CreateStream creates a new stream. +func (r *SchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { + now := time.Now().UnixNano() + stream.Metadata.ModRevision = now + stream.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindStream, stream) +} + +// UpdateStream updates an existing stream. +func (r *SchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { + now := time.Now().UnixNano() + stream.Metadata.ModRevision = now + stream.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindStream, stream) +} + +// DeleteStream deletes a stream. +func (r *SchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindStream, metadata.GetGroup(), metadata.GetName()) +} + +// Measure methods. + +// GetMeasure retrieves a measure by metadata. +func (r *SchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error) { + return getResource[*databasev1.Measure](ctx, r, schema.KindMeasure, metadata.GetGroup(), metadata.GetName()) +} + +// ListMeasure lists measures in a group. +func (r *SchemaRegistry) ListMeasure(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Measure, error) { + return listResources[*databasev1.Measure](ctx, r, schema.KindMeasure, opt.Group, true) +} + +// CreateMeasure creates a new measure. +func (r *SchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { + now := time.Now().UnixNano() + measure.Metadata.ModRevision = now + measure.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindMeasure, measure) +} + +// UpdateMeasure updates an existing measure. +func (r *SchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { + now := time.Now().UnixNano() + measure.Metadata.ModRevision = now + measure.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindMeasure, measure) +} + +// DeleteMeasure deletes a measure. +func (r *SchemaRegistry) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindMeasure, metadata.GetGroup(), metadata.GetName()) +} + +// TopNAggregations returns top-N aggregations for a measure. +func (r *SchemaRegistry) TopNAggregations(ctx context.Context, metadata *commonv1.Metadata) ([]*databasev1.TopNAggregation, error) { + aggregations, listErr := r.ListTopNAggregation(ctx, schema.ListOpt{Group: metadata.GetGroup()}) + if listErr != nil { + return nil, listErr + } + var result []*databasev1.TopNAggregation + for _, aggrDef := range aggregations { + if aggrDef.GetSourceMeasure().GetName() == metadata.GetName() { + result = append(result, aggrDef) + } + } + return result, nil +} + +// Trace methods. + +// GetTrace retrieves a trace by metadata. +func (r *SchemaRegistry) GetTrace(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Trace, error) { + return getResource[*databasev1.Trace](ctx, r, schema.KindTrace, metadata.GetGroup(), metadata.GetName()) +} + +// ListTrace lists traces in a group. +func (r *SchemaRegistry) ListTrace(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Trace, error) { + return listResources[*databasev1.Trace](ctx, r, schema.KindTrace, opt.Group, true) +} + +// CreateTrace creates a new trace. +func (r *SchemaRegistry) CreateTrace(ctx context.Context, trace *databasev1.Trace) (int64, error) { + now := time.Now().UnixNano() + trace.Metadata.ModRevision = now + trace.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindTrace, trace) +} + +// UpdateTrace updates an existing trace. +func (r *SchemaRegistry) UpdateTrace(ctx context.Context, trace *databasev1.Trace) (int64, error) { + now := time.Now().UnixNano() + trace.Metadata.ModRevision = now + trace.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindTrace, trace) +} + +// DeleteTrace deletes a trace. +func (r *SchemaRegistry) DeleteTrace(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindTrace, metadata.GetGroup(), metadata.GetName()) +} + +// Group methods. + +// GetGroup retrieves a group by name. +func (r *SchemaRegistry) GetGroup(ctx context.Context, group string) (*commonv1.Group, error) { + return getResource[*commonv1.Group](ctx, r, schema.KindGroup, "", group) +} + +// ListGroup lists all groups. +func (r *SchemaRegistry) ListGroup(ctx context.Context) ([]*commonv1.Group, error) { + return listResources[*commonv1.Group](ctx, r, schema.KindGroup, "", false) +} + +// CreateGroup creates a new group. +func (r *SchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Group) error { + now := time.Now().UnixNano() + group.Metadata.ModRevision = now + group.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindGroup, group) +} + +// UpdateGroup updates an existing group. +func (r *SchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error { + now := time.Now().UnixNano() + group.Metadata.ModRevision = now + group.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindGroup, group) +} + +// DeleteGroup deletes a group. +func (r *SchemaRegistry) DeleteGroup(ctx context.Context, group string) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindGroup, "", group) +} + +// IndexRule methods. + +// GetIndexRule retrieves an index rule by metadata. +func (r *SchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error) { + return getResource[*databasev1.IndexRule](ctx, r, schema.KindIndexRule, metadata.GetGroup(), metadata.GetName()) +} + +// ListIndexRule lists index rules in a group. +func (r *SchemaRegistry) ListIndexRule(ctx context.Context, opt schema.ListOpt) ([]*databasev1.IndexRule, error) { + return listResources[*databasev1.IndexRule](ctx, r, schema.KindIndexRule, opt.Group, true) +} + +// CreateIndexRule creates a new index rule. +func (r *SchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error { + now := time.Now().UnixNano() + indexRule.Metadata.ModRevision = now + indexRule.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindIndexRule, indexRule) +} + +// UpdateIndexRule updates an existing index rule. +func (r *SchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error { + now := time.Now().UnixNano() + indexRule.Metadata.ModRevision = now + indexRule.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindIndexRule, indexRule) +} + +// DeleteIndexRule deletes an index rule. +func (r *SchemaRegistry) DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindIndexRule, metadata.GetGroup(), metadata.GetName()) +} + +// IndexRuleBinding methods. + +// GetIndexRuleBinding retrieves an index rule binding by metadata. +func (r *SchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error) { + return getResource[*databasev1.IndexRuleBinding](ctx, r, schema.KindIndexRuleBinding, metadata.GetGroup(), metadata.GetName()) +} + +// ListIndexRuleBinding lists index rule bindings in a group. +func (r *SchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt schema.ListOpt) ([]*databasev1.IndexRuleBinding, error) { + return listResources[*databasev1.IndexRuleBinding](ctx, r, schema.KindIndexRuleBinding, opt.Group, true) +} + +// CreateIndexRuleBinding creates a new index rule binding. +func (r *SchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error { + now := time.Now().UnixNano() + indexRuleBinding.Metadata.ModRevision = now + indexRuleBinding.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindIndexRuleBinding, indexRuleBinding) +} + +// UpdateIndexRuleBinding updates an existing index rule binding. +func (r *SchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error { + now := time.Now().UnixNano() + indexRuleBinding.Metadata.ModRevision = now + indexRuleBinding.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindIndexRuleBinding, indexRuleBinding) +} + +// DeleteIndexRuleBinding deletes an index rule binding. +func (r *SchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindIndexRuleBinding, metadata.GetGroup(), metadata.GetName()) +} + +// TopNAggregation methods. + +// GetTopNAggregation retrieves a top-N aggregation by metadata. +func (r *SchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.TopNAggregation, error) { + return getResource[*databasev1.TopNAggregation](ctx, r, schema.KindTopNAggregation, metadata.GetGroup(), metadata.GetName()) +} + +// ListTopNAggregation lists top-N aggregations in a group. +func (r *SchemaRegistry) ListTopNAggregation(ctx context.Context, opt schema.ListOpt) ([]*databasev1.TopNAggregation, error) { + return listResources[*databasev1.TopNAggregation](ctx, r, schema.KindTopNAggregation, opt.Group, true) +} + +// CreateTopNAggregation creates a new top-N aggregation. +func (r *SchemaRegistry) CreateTopNAggregation(ctx context.Context, topN *databasev1.TopNAggregation) error { + now := time.Now().UnixNano() + topN.Metadata.ModRevision = now + topN.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindTopNAggregation, topN) +} + +// UpdateTopNAggregation updates an existing top-N aggregation. +func (r *SchemaRegistry) UpdateTopNAggregation(ctx context.Context, topN *databasev1.TopNAggregation) error { + now := time.Now().UnixNano() + topN.Metadata.ModRevision = now + topN.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindTopNAggregation, topN) +} + +// DeleteTopNAggregation deletes a top-N aggregation. +func (r *SchemaRegistry) DeleteTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindTopNAggregation, metadata.GetGroup(), metadata.GetName()) +} + +// Node methods. + +// ListNode lists nodes by role. +func (r *SchemaRegistry) ListNode(context.Context, databasev1.Role) ([]*databasev1.Node, error) { + panic("property based schema registry does not support list node") +} + +// RegisterNode registers a node. +func (r *SchemaRegistry) RegisterNode(context.Context, *databasev1.Node, bool) error { + panic("property based schema registry does not support register node") +} + +// GetNode retrieves a node by name. +func (r *SchemaRegistry) GetNode(context.Context, string) (*databasev1.Node, error) { + panic("property based schema registry does not support get node") +} + +// UpdateNode updates a node. +func (r *SchemaRegistry) UpdateNode(context.Context, *databasev1.Node) error { + panic("property based schema registry does not support update node") +} + +// Property methods. + +// GetProperty retrieves a property by metadata. +func (r *SchemaRegistry) GetProperty(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Property, error) { + return getResource[*databasev1.Property](ctx, r, schema.KindProperty, metadata.GetGroup(), metadata.GetName()) +} + +// ListProperty lists properties in a group. +func (r *SchemaRegistry) ListProperty(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Property, error) { + return listResources[*databasev1.Property](ctx, r, schema.KindProperty, opt.Group, true) +} + +// CreateProperty creates a new property. +func (r *SchemaRegistry) CreateProperty(ctx context.Context, property *databasev1.Property) error { + now := time.Now().UnixNano() + property.Metadata.ModRevision = now + property.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindProperty, property) +} + +// UpdateProperty updates an existing property. +func (r *SchemaRegistry) UpdateProperty(ctx context.Context, property *databasev1.Property) error { + now := time.Now().UnixNano() + property.Metadata.ModRevision = now + property.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindProperty, property) +} + +// DeleteProperty deletes a property. +func (r *SchemaRegistry) DeleteProperty(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindProperty, metadata.GetGroup(), metadata.GetName()) +} + +func getResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, group, name string) (T, error) { + var zero T + prop, getErr := r.getSchema(ctx, kind, group, name) + if getErr != nil { + return zero, getErr + } + if prop == nil { + return zero, schema.ErrGRPCResourceNotFound + } + md, convErr := ToSchema(kind, prop) + if convErr != nil { + return zero, convErr + } + result, ok := md.Spec.(T) + if !ok { + return zero, errors.Errorf("unexpected spec type for kind %s", kind) + } + return result, nil +} + +func listResources[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, group string, requireGroup bool) ([]T, error) { + if requireGroup && group == "" { + return nil, schema.BadRequest("group", "group should not be empty") + } + props, listErr := r.listSchemas(ctx, kind, group) + if listErr != nil { + return nil, listErr + } + results := make([]T, 0, len(props)) + for _, prop := range props { + md, convErr := ToSchema(kind, prop) + if convErr != nil { + r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to convert property") + continue + } + result, ok := md.Spec.(T) + if !ok { + r.l.Warn().Stringer("kind", kind).Msg("unexpected spec type") + continue + } + results = append(results, result) + } + return results, nil +} + +func initResourceFromClient[T proto.Message](ctx context.Context, r *SchemaRegistry, client schemav1.SchemaManagementServiceClient, + kind schema.Kind, group string, maxRevision *int64, +) { + props, listErr := r.listSchemasFromClient(ctx, client, kind, group) + if listErr != nil { + r.l.Warn().Err(listErr).Str("group", group).Msg("failed to list streams") + return + } + for _, prop := range props { + md, convErr := ToSchema(kind, prop) + if convErr != nil { + r.l.Warn().Err(convErr).Msgf("failed to convert property to %s", kind) + continue + } + *maxRevision = r.processInitialResource(kind, md.Spec.(T), *maxRevision) + } +} + +func createResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, spec T) error { + metadata, err := getMetadataFromSpec(kind, spec) + if err != nil { + return err + } + originalSchema, err := r.getSchema(ctx, kind, metadata.Group, metadata.Name) + if err != nil { + return err + } + if originalSchema != nil { + return fmt.Errorf("schema %s/%s already exist", metadata.Group, metadata.Name) + } + prop, convErr := SchemaToProperty(kind, spec) + if convErr != nil { + return convErr + } + return r.insertToAllServers(ctx, prop) +} + +func updateResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, spec T) error { + metadata, err := getMetadataFromSpec(kind, spec) + if err != nil { + return err + } + originalSchema, err := r.getSchema(ctx, kind, metadata.Group, metadata.Name) + if err != nil { + return err + } + if originalSchema == nil { + return fmt.Errorf("schema %s/%s not exist", metadata.Group, metadata.Name) + } + prop, convErr := SchemaToProperty(kind, spec) + if convErr != nil { + return convErr + } + return r.updateToAllServers(ctx, prop) +} + +func (r *SchemaRegistry) getSchema(ctx context.Context, kind schema.Kind, group, name string) (*propertyv1.Property, error) { + propID := BuildPropertyID(kind, &commonv1.Metadata{Group: group, Name: name}) + query := buildSchemaQuery(kind, group, []string{propID}) + propMap, queryErr := r.queryAndRepairSchemas(ctx, query) + if queryErr != nil { + return nil, queryErr + } + info := propMap[propID] + if info == nil || info.best == nil || info.best.deleteTime > 0 { + return nil, nil + } + return info.best.property, nil +} + +type schemaWithDeleteTime struct { + property *propertyv1.Property + deleteTime int64 +} + +type propInfo struct { + nodeRev map[string]int64 + nodeDelTime map[string]int64 + best *schemaWithDeleteTime +} + +func (r *SchemaRegistry) listSchemas(ctx context.Context, kind schema.Kind, group string) ([]*propertyv1.Property, error) { + query := buildSchemaQuery(kind, group, nil) + propMap, queryErr := r.queryAndRepairSchemas(ctx, query) + if queryErr != nil { + return nil, queryErr + } + result := make([]*propertyv1.Property, 0, len(propMap)) + for _, info := range propMap { + if info.best != nil && info.best.deleteTime == 0 { + result = append(result, info.best.property) + } + } + return result, nil +} + +func (r *SchemaRegistry) getNodeConnectionsWithAddr() map[string]*nodeConnection { + r.connMu.RLock() + defer r.connMu.RUnlock() + result := make(map[string]*nodeConnection, len(r.nodeConns)) + for addr, nc := range r.nodeConns { + result[addr] = nc + } + return result +} + +func (r *SchemaRegistry) querySchemasFromClient(ctx context.Context, client schemav1.SchemaManagementServiceClient, + query *propertyv1.QueryRequest, +) ([]*schemaWithDeleteTime, error) { + resp, listErr := client.ListSchemas(ctx, &schemav1.ListSchemasRequest{Query: query}) + if listErr != nil { + return nil, listErr + } + results := make([]*schemaWithDeleteTime, 0, len(resp.Properties)) + for idx, prop := range resp.Properties { + var deleteTime int64 + if idx < len(resp.DeleteTimes) { + deleteTime = resp.DeleteTimes[idx] + } + results = append(results, &schemaWithDeleteTime{ + property: prop, + deleteTime: deleteTime, + }) + } + return results, nil +} + +func buildSchemaQuery(kind schema.Kind, group string, ids []string) *propertyv1.QueryRequest { + query := &propertyv1.QueryRequest{ + Groups: []string{SchemaGroup}, + Name: kind.String(), + Ids: ids, + Limit: 10000, + } + if group != "" { + query.Criteria = &modelv1.Criteria{ + Exp: &modelv1.Criteria_Condition{ + Condition: &modelv1.Condition{ + Name: TagKeyGroup, + Op: modelv1.Condition_BINARY_OP_EQ, + Value: &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: group}}, + }, + }, + }, + } + } + return query +} + +type schemaQueryNodeResult struct { + err error + addr string + schemas []*schemaWithDeleteTime +} + +func (r *SchemaRegistry) queryAndRepairSchemas(ctx context.Context, query *propertyv1.QueryRequest) (map[string]*propInfo, error) { + if r.metrics != nil { + r.metrics.totalQueryStarted.Inc(1, "list") + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalQueryFinished.Inc(1, "list") + r.metrics.totalQueryLatency.Inc(time.Since(start).Seconds(), "list") + } + }() + nodeConns := r.getNodeConnectionsWithAddr() + if len(nodeConns) == 0 { + if r.metrics != nil { + r.metrics.totalQueryErr.Inc(1, "list") + } + return nil, errNoMetadataServers + } + resultCh := make(chan schemaQueryNodeResult, len(nodeConns)) + for addr, nc := range nodeConns { + go func(address string, client schemav1.SchemaManagementServiceClient) { + schemas, queryErr := r.querySchemasFromClient(ctx, client, query) + resultCh <- schemaQueryNodeResult{addr: address, schemas: schemas, err: queryErr} + }(addr, nc.mgrClient) + } + propMap := make(map[string]*propInfo) + for range len(nodeConns) { + res := <-resultCh + if res.err != nil { + r.l.Warn().Err(res.err).Str("node", res.addr).Msg("failed to query node") + continue + } + for _, s := range res.schemas { + propID := s.property.Id + rev := s.property.UpdatedAt.AsTime().UnixNano() + info, exists := propMap[propID] + if !exists { + info = &propInfo{ + nodeRev: make(map[string]int64), + nodeDelTime: make(map[string]int64), + } + propMap[propID] = info + } + if existingRev, ok := info.nodeRev[res.addr]; !ok || rev > existingRev { + info.nodeRev[res.addr] = rev + info.nodeDelTime[res.addr] = s.deleteTime + } + if info.best == nil || info.best.property.UpdatedAt.AsTime().UnixNano() < rev { + info.best = s + } + } + } + r.repairInconsistentNodes(ctx, nodeConns, propMap) + return propMap, nil +} + +func (r *SchemaRegistry) repairInconsistentNodes(ctx context.Context, nodeConns map[string]*nodeConnection, propMap map[string]*propInfo) { + if r.metrics != nil { + r.metrics.totalRepairStarted.Inc(1) + } + defer func() { + if r.metrics != nil { + r.metrics.totalRepairFinished.Inc(1) + } + }() + for propID, info := range propMap { + if info.best == nil { + continue + } + bestRev := info.best.property.UpdatedAt.AsTime().UnixNano() + var nodesToRepair []string + for addr := range nodeConns { + nodeRev, exists := info.nodeRev[addr] + if !exists || nodeRev < bestRev { + nodesToRepair = append(nodesToRepair, addr) + } + } + if len(nodesToRepair) == 0 { + continue + } + if r.metrics != nil { + r.metrics.totalRepairNodes.Inc(float64(len(nodesToRepair))) + } + r.l.Info().Str("propID", propID).Int64("bestRev", bestRev). + Int64("deleteTime", info.best.deleteTime). + Strs("nodesToRepair", nodesToRepair).Msg("repairing schema inconsistency") + var wg sync.WaitGroup + for _, addr := range nodesToRepair { + nc := nodeConns[addr] + wg.Add(1) + go func(nodeAddr string, client schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, repairErr := client.RepairSchema(ctx, &schemav1.RepairSchemaRequest{ + Property: info.best.property, + DeleteTime: info.best.deleteTime, + }) + if repairErr != nil { + r.l.Warn().Err(repairErr).Str("propID", propID).Str("node", nodeAddr).Msg("repair failed") + } else { + r.l.Info().Str("propID", propID).Str("node", nodeAddr).Msg("repaired successfully") + } + }(addr, nc.mgrClient) + } + wg.Wait() + } +} + +func (r *SchemaRegistry) insertToAllServers(ctx context.Context, prop *propertyv1.Property) error { + kind, group := extractKindAndGroupFromProperty(prop) + if r.metrics != nil { + r.metrics.totalWriteStarted.Inc(1, kind, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalWriteFinished.Inc(1, kind, group) + r.metrics.totalWriteLatency.Inc(time.Since(start).Seconds(), kind, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return errNoMetadataServers + } + var firstErr error + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, callErr := c.InsertSchema(ctx, &schemav1.InsertSchemaRequest{Property: prop}) + if callErr != nil { + mu.Lock() + if firstErr == nil { + firstErr = callErr + } + mu.Unlock() + } + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return firstErr +} + +func (r *SchemaRegistry) updateToAllServers(ctx context.Context, prop *propertyv1.Property) error { + kind, group := extractKindAndGroupFromProperty(prop) + if r.metrics != nil { + r.metrics.totalWriteStarted.Inc(1, kind, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalWriteFinished.Inc(1, kind, group) + r.metrics.totalWriteLatency.Inc(time.Since(start).Seconds(), kind, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return errNoMetadataServers + } + var firstErr error + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, callErr := c.UpdateSchema(ctx, &schemav1.UpdateSchemaRequest{Property: prop}) + if callErr != nil { + mu.Lock() + if firstErr == nil { + firstErr = callErr + } + mu.Unlock() + } + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return firstErr +} + +func (r *SchemaRegistry) deleteFromAllServers(ctx context.Context, kind schema.Kind, group, name string) (bool, error) { + kindStr := kind.String() + if r.metrics != nil { + r.metrics.totalDeleteStarted.Inc(1, kindStr, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalDeleteFinished.Inc(1, kindStr, group) + r.metrics.totalDeleteLatency.Inc(time.Since(start).Seconds(), kindStr, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalDeleteErr.Inc(1, kindStr, group) + } + return false, errNoMetadataServers + } + propID := BuildPropertyID(kind, &commonv1.Metadata{Group: group, Name: name}) + var firstErr error + var found bool + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + resp, callErr := c.DeleteSchema(ctx, &schemav1.DeleteSchemaRequest{ + Delete: &propertyv1.DeleteRequest{ + Group: SchemaGroup, + Name: kind.String(), + Id: propID, + }, + // update the latest update time for the notification + UpdateAt: timestamppb.Now(), + }) + mu.Lock() + if callErr != nil && firstErr == nil { + firstErr = callErr + } + if resp != nil && resp.Found { + found = true + } + mu.Unlock() + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalDeleteErr.Inc(1, kindStr, group) + } + return found, firstErr +} + +func (r *SchemaRegistry) initializeFromNode(nodeConn *nodeConnection) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + r.l.Info().Str("address", nodeConn.conn.Target()).Msg("initializing resources from metadata node") + groups, listErr := r.listGroupFromClient(ctx, nodeConn.mgrClient) + if listErr != nil { + r.l.Warn().Err(listErr).Msg("failed to list groups during initialization") + return + } + var maxRevision int64 + for _, group := range groups { + // init group + maxRevision = r.processInitialResource(schema.KindGroup, group, maxRevision) + + // init related resources + groupName := group.GetMetadata().GetName() + catalog := group.GetCatalog() + switch catalog { + case commonv1.Catalog_CATALOG_STREAM: + initResourceFromClient[*databasev1.Stream](ctx, r, nodeConn.mgrClient, schema.KindStream, groupName, &maxRevision) + case commonv1.Catalog_CATALOG_MEASURE: + initResourceFromClient[*databasev1.Measure](ctx, r, nodeConn.mgrClient, schema.KindMeasure, groupName, &maxRevision) + initResourceFromClient[*databasev1.TopNAggregation](ctx, r, nodeConn.mgrClient, schema.KindTopNAggregation, groupName, &maxRevision) + case commonv1.Catalog_CATALOG_TRACE: + initResourceFromClient[*databasev1.Trace](ctx, r, nodeConn.mgrClient, schema.KindTrace, groupName, &maxRevision) + case commonv1.Catalog_CATALOG_PROPERTY: + initResourceFromClient[*databasev1.Property](ctx, r, nodeConn.mgrClient, schema.KindProperty, groupName, &maxRevision) + } + if catalog != commonv1.Catalog_CATALOG_PROPERTY { + initResourceFromClient[*databasev1.IndexRule](ctx, r, nodeConn.mgrClient, schema.KindIndexRule, groupName, &maxRevision) + initResourceFromClient[*databasev1.IndexRuleBinding](ctx, r, nodeConn.mgrClient, schema.KindIndexRuleBinding, groupName, &maxRevision) + } + } + r.l.Info().Str("address", nodeConn.conn.Target()). + Int64("latestUpdateAt", maxRevision).Msg("completed resource initialization") +} + +func (r *SchemaRegistry) processInitialResource(kind schema.Kind, spec proto.Message, currentMax int64) int64 { + prop, convErr := SchemaToProperty(kind, spec) + if convErr != nil { + r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to convert to property") + return currentMax + } + revision := prop.UpdatedAt.AsTime().UnixNano() + entry := &cacheEntry{ + latestUpdateAt: revision, + kind: kind, + group: getGroupFromTags(prop.Tags), + name: getNameFromTags(prop.Tags), + } + if r.cache.Update(prop.Id, entry) { + md := schema.Metadata{ + TypeMeta: schema.TypeMeta{ + Kind: kind, + Name: entry.name, + Group: entry.group, + ModRevision: prop.Metadata.ModRevision, + }, + Spec: spec, + } + r.notifyHandlers(kind, md, false) + } + if r.metrics != nil { + r.metrics.cacheSize.Set(float64(r.cache.Size())) + } + if revision > currentMax { + return revision + } + return currentMax +} + +func getGroupFromTags(tags []*modelv1.Tag) string { + for _, tag := range tags { + if tag.Key == TagKeyGroup { + return tag.Value.GetStr().GetValue() + } + } + return "" +} + +func getNameFromTags(tags []*modelv1.Tag) string { + for _, tag := range tags { + if tag.Key == TagKeyName { + return tag.Value.GetStr().GetValue() + } + } + return "" +} + +func (r *SchemaRegistry) notifyHandlers(kind schema.Kind, md schema.Metadata, isDelete bool) { + r.mu.RLock() + handlers := r.handlers[kind] + r.mu.RUnlock() + for _, h := range handlers { + if isDelete { + h.OnDelete(md) + } else { + h.OnAddOrUpdate(md) + } + } +} + +func (r *SchemaRegistry) addHandler(kind schema.Kind, handler schema.EventHandler) { + if r.handlers[kind] == nil { + r.handlers[kind] = make([]schema.EventHandler, 0) + } + r.handlers[kind] = append(r.handlers[kind], handler) +} + +// parsePropertyID parses property ID to get kind, group, name. +// Format: "kind_group/name" or "kind_name" (for Group/Node). +func parsePropertyID(propID string) (schema.Kind, string, string) { + underscoreIdx := strings.Index(propID, "_") + if underscoreIdx == -1 { + return 0, "", "" + } + kindStr := propID[:underscoreIdx] + rest := propID[underscoreIdx+1:] + kind, kindErr := KindFromString(kindStr) + if kindErr != nil { + return 0, "", "" + } + if kind == schema.KindGroup { + return kind, "", rest + } + slashIdx := strings.Index(rest, "/") + if slashIdx == -1 { + return kind, "", rest + } + return kind, rest[:slashIdx], rest[slashIdx+1:] +} + +func (r *SchemaRegistry) globalSync() { + if !r.syncCloser.AddRunning() { + return + } + defer r.syncCloser.Done() + ticker := time.NewTicker(r.syncInterval) + defer ticker.Stop() + for { + select { + case <-r.syncCloser.CloseNotify(): + return + case <-ticker.C: + r.performSync() + } + } +} + +func (r *SchemaRegistry) performSync() { + if r.metrics != nil { + r.metrics.totalSyncStarted.Inc(1) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalSyncFinished.Inc(1) + r.metrics.totalSyncLatency.Inc(time.Since(start).Seconds()) + } + }() + r.l.Debug().Msg("performing global schema sync") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + clients := r.getClients() + if len(clients) == 0 { + return + } + for _, c := range clients { + sinceRevision := r.cache.GetMaxRevision() + updatedNames := r.queryUpdatedSchemas(ctx, c.updateClient, sinceRevision) + syncedKinds := make(map[schema.Kind]bool) + if len(updatedNames) > 0 { Review Comment: Perform a full reconciliation after several sync rounds. A revision is not reliable in an eventual system. The round times could be 3. ########## banyand/metadata/schema/property/client.go: ########## @@ -0,0 +1,1388 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var errNoMetadataServers = errors.New("no metadata servers available") + +type clientMetrics struct { + totalSyncStarted meter.Counter + totalSyncFinished meter.Counter + totalSyncErr meter.Counter + totalSyncLatency meter.Counter + totalSyncUpdates meter.Counter + + totalQueryStarted meter.Counter + totalQueryFinished meter.Counter + totalQueryErr meter.Counter + totalQueryLatency meter.Counter + + totalRepairStarted meter.Counter + totalRepairFinished meter.Counter + totalRepairNodes meter.Counter + + totalWriteStarted meter.Counter + totalWriteFinished meter.Counter + totalWriteErr meter.Counter + totalWriteLatency meter.Counter + + totalDeleteStarted meter.Counter + totalDeleteFinished meter.Counter + totalDeleteErr meter.Counter + totalDeleteLatency meter.Counter + + cacheSize meter.Gauge +} + +func newClientMetrics(factory observability.Factory) *clientMetrics { + return &clientMetrics{ + totalSyncStarted: factory.NewCounter("property_sync_started"), + totalSyncFinished: factory.NewCounter("property_sync_finished"), + totalSyncErr: factory.NewCounter("property_sync_err"), + totalSyncLatency: factory.NewCounter("property_sync_latency"), + totalSyncUpdates: factory.NewCounter("property_sync_updates"), + + totalQueryStarted: factory.NewCounter("property_query_started", "method"), + totalQueryFinished: factory.NewCounter("property_query_finished", "method"), + totalQueryErr: factory.NewCounter("property_query_err", "method"), + totalQueryLatency: factory.NewCounter("property_query_latency", "method"), + + totalRepairStarted: factory.NewCounter("property_repair_started"), + totalRepairFinished: factory.NewCounter("property_repair_finished"), + totalRepairNodes: factory.NewCounter("property_repair_nodes"), + + totalWriteStarted: factory.NewCounter("property_write_started", "kind", "group"), + totalWriteFinished: factory.NewCounter("property_write_finished", "kind", "group"), + totalWriteErr: factory.NewCounter("property_write_err", "kind", "group"), + totalWriteLatency: factory.NewCounter("property_write_latency", "kind", "group"), + + totalDeleteStarted: factory.NewCounter("property_delete_started", "kind", "group"), + totalDeleteFinished: factory.NewCounter("property_delete_finished", "kind", "group"), + totalDeleteErr: factory.NewCounter("property_delete_err", "kind", "group"), + totalDeleteLatency: factory.NewCounter("property_delete_latency", "kind", "group"), + + cacheSize: factory.NewGauge("property_cache_size"), + } +} + +// nodeConnection represents a connection to a metadata node. +type nodeConnection struct { + mgrClient schemav1.SchemaManagementServiceClient + updateClient schemav1.SchemaUpdateServiceClient + conn *grpc.ClientConn +} + +// SchemaRegistry implements schema.Registry interface using property-based storage. +type SchemaRegistry struct { + dialOptsPrv common.GRPCDialOptionsProvider + handlers map[schema.Kind][]schema.EventHandler + nodeConns map[string]*nodeConnection + cache *schemaCache + closer *run.Closer + syncCloser *run.Closer + l *logger.Logger + metrics *clientMetrics + mu sync.RWMutex + connMu sync.RWMutex + grpcTimeout time.Duration + syncInterval time.Duration +} + +// NewSchemaRegistryClient creates a new property schema registry client. +// It accepts a NodeDiscovery service to dynamically discover and connect to metadata nodes. +func NewSchemaRegistryClient(dialOptsProvider common.GRPCDialOptionsProvider, grpcTimeout, syncInterval time.Duration) *SchemaRegistry { + r := &SchemaRegistry{ + dialOptsPrv: dialOptsProvider, + nodeConns: make(map[string]*nodeConnection), + handlers: make(map[schema.Kind][]schema.EventHandler), + cache: newSchemaCache(), + closer: run.NewCloser(1), + l: logger.GetLogger("property-schema-registry"), + grpcTimeout: grpcTimeout, + syncInterval: syncInterval, + } + return r +} + +// SetMetrics initializes the client metrics using the provided metrics registry. +func (r *SchemaRegistry) SetMetrics(omr observability.MetricsRegistry) { + if omr == nil { + return + } + clientScope := metadataScope.SubScope("client") + r.metrics = newClientMetrics(omr.With(clientScope)) +} + +// OnInit implements schema.EventHandler. +func (r *SchemaRegistry) OnInit(_ []schema.Kind) (bool, []int64) { + return false, nil +} + +// OnAddOrUpdate implements schema.EventHandler for getting the all metadata nodes. +func (r *SchemaRegistry) OnAddOrUpdate(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + containsMetadata := false + for _, role := range node.Roles { + if role == databasev1.Role_ROLE_META { + containsMetadata = true + break + } + } + if !containsMetadata { + return + } + r.addNodeConnection(node) +} + +// OnDelete implements schema.EventHandler for getting which metadata node has been deleted. +func (r *SchemaRegistry) OnDelete(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + r.removeNodeConnection(node.GetGrpcAddress()) +} + +func (r *SchemaRegistry) addNodeConnection(node *databasev1.Node) { + address := node.GetGrpcAddress() + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + if _, exists := r.nodeConns[address]; exists { + return + } + var dialOpts []grpc.DialOption + if r.dialOptsPrv != nil { + var optsErr error + dialOpts, optsErr = r.dialOptsPrv.GetDialOptions(address) + if optsErr != nil { + r.l.Warn().Err(optsErr).Str("address", address).Msg("failed to get dial options") + return + } + } + conn, connErr := grpchelper.Conn(address, r.grpcTimeout, dialOpts...) + if connErr != nil { + r.l.Warn().Err(connErr).Str("address", address).Msg("failed to connect to metadata node") + return + } + schemaMgrClient := schemav1.NewSchemaManagementServiceClient(conn) + schemaUpdateClient := schemav1.NewSchemaUpdateServiceClient(conn) + nodeConn := &nodeConnection{ + mgrClient: schemaMgrClient, + updateClient: schemaUpdateClient, + conn: conn, + } + r.nodeConns[address] = nodeConn + r.l.Info().Str("address", address).Str("node", node.GetMetadata().GetName()).Msg("connected to metadata node") + go r.initializeFromNode(nodeConn) +} + +func (r *SchemaRegistry) removeNodeConnection(address string) { + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + nc, exists := r.nodeConns[address] + if !exists { + return + } + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", address).Msg("failed to close connection") + } + } + delete(r.nodeConns, address) + r.l.Info().Str("address", address).Msg("disconnected from metadata node") +} + +// Close closes the registry. +func (r *SchemaRegistry) Close() error { + r.closer.Done() + r.closer.CloseThenWait() + if r.syncCloser != nil { + r.syncCloser.Done() + r.syncCloser.CloseThenWait() + } + r.connMu.Lock() + defer r.connMu.Unlock() + for addr, nc := range r.nodeConns { + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", addr).Msg("failed to close connection") + } + } + } + r.nodeConns = make(map[string]*nodeConnection) + return nil +} + +// RegisterHandler registers an event handler for a schema kind. +func (r *SchemaRegistry) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) { + // Validate kind + if kind&schema.KindMask != kind { + panic(fmt.Sprintf("invalid kind %d", kind)) + } + r.mu.Lock() + defer r.mu.Unlock() + var kinds []schema.Kind + for i := 0; i < schema.KindSize; i++ { + ki := schema.Kind(1 << i) + if kind&ki > 0 { + kinds = append(kinds, ki) + } + } + r.l.Info().Str("name", name).Interface("kinds", kinds).Msg("registering handler") + handler.OnInit(kinds) + for _, ki := range kinds { + r.addHandler(ki, handler) + } +} + +// Register registers a metadata entry. +func (r *SchemaRegistry) Register(context.Context, schema.Metadata, bool) error { + panic("property based schema registry not support register") +} + +// Compact is not supported in property mode. +func (r *SchemaRegistry) Compact(context.Context, int64) error { + return nil +} + +// StartWatcher starts the global sync mechanism. +func (r *SchemaRegistry) StartWatcher() { + r.syncCloser = run.NewCloser(1) + go r.globalSync() +} + +// Stream methods. + +// GetStream retrieves a stream by metadata. +func (r *SchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error) { + return getResource[*databasev1.Stream](ctx, r, schema.KindStream, metadata.GetGroup(), metadata.GetName()) +} + +// ListStream lists streams in a group. +func (r *SchemaRegistry) ListStream(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Stream, error) { + return listResources[*databasev1.Stream](ctx, r, schema.KindStream, opt.Group, true) +} + +// CreateStream creates a new stream. +func (r *SchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { + now := time.Now().UnixNano() + stream.Metadata.ModRevision = now + stream.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindStream, stream) +} + +// UpdateStream updates an existing stream. +func (r *SchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { + now := time.Now().UnixNano() + stream.Metadata.ModRevision = now + stream.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindStream, stream) +} + +// DeleteStream deletes a stream. +func (r *SchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindStream, metadata.GetGroup(), metadata.GetName()) +} + +// Measure methods. + +// GetMeasure retrieves a measure by metadata. +func (r *SchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error) { + return getResource[*databasev1.Measure](ctx, r, schema.KindMeasure, metadata.GetGroup(), metadata.GetName()) +} + +// ListMeasure lists measures in a group. +func (r *SchemaRegistry) ListMeasure(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Measure, error) { + return listResources[*databasev1.Measure](ctx, r, schema.KindMeasure, opt.Group, true) +} + +// CreateMeasure creates a new measure. +func (r *SchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { + now := time.Now().UnixNano() + measure.Metadata.ModRevision = now + measure.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindMeasure, measure) +} + +// UpdateMeasure updates an existing measure. +func (r *SchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { + now := time.Now().UnixNano() + measure.Metadata.ModRevision = now + measure.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindMeasure, measure) +} + +// DeleteMeasure deletes a measure. +func (r *SchemaRegistry) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindMeasure, metadata.GetGroup(), metadata.GetName()) +} + +// TopNAggregations returns top-N aggregations for a measure. +func (r *SchemaRegistry) TopNAggregations(ctx context.Context, metadata *commonv1.Metadata) ([]*databasev1.TopNAggregation, error) { + aggregations, listErr := r.ListTopNAggregation(ctx, schema.ListOpt{Group: metadata.GetGroup()}) + if listErr != nil { + return nil, listErr + } + var result []*databasev1.TopNAggregation + for _, aggrDef := range aggregations { + if aggrDef.GetSourceMeasure().GetName() == metadata.GetName() { + result = append(result, aggrDef) + } + } + return result, nil +} + +// Trace methods. + +// GetTrace retrieves a trace by metadata. +func (r *SchemaRegistry) GetTrace(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Trace, error) { + return getResource[*databasev1.Trace](ctx, r, schema.KindTrace, metadata.GetGroup(), metadata.GetName()) +} + +// ListTrace lists traces in a group. +func (r *SchemaRegistry) ListTrace(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Trace, error) { + return listResources[*databasev1.Trace](ctx, r, schema.KindTrace, opt.Group, true) +} + +// CreateTrace creates a new trace. +func (r *SchemaRegistry) CreateTrace(ctx context.Context, trace *databasev1.Trace) (int64, error) { + now := time.Now().UnixNano() + trace.Metadata.ModRevision = now + trace.UpdatedAt = timestamppb.Now() + return now, createResource(ctx, r, schema.KindTrace, trace) +} + +// UpdateTrace updates an existing trace. +func (r *SchemaRegistry) UpdateTrace(ctx context.Context, trace *databasev1.Trace) (int64, error) { + now := time.Now().UnixNano() + trace.Metadata.ModRevision = now + trace.UpdatedAt = timestamppb.Now() + return now, updateResource(ctx, r, schema.KindTrace, trace) +} + +// DeleteTrace deletes a trace. +func (r *SchemaRegistry) DeleteTrace(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindTrace, metadata.GetGroup(), metadata.GetName()) +} + +// Group methods. + +// GetGroup retrieves a group by name. +func (r *SchemaRegistry) GetGroup(ctx context.Context, group string) (*commonv1.Group, error) { + return getResource[*commonv1.Group](ctx, r, schema.KindGroup, "", group) +} + +// ListGroup lists all groups. +func (r *SchemaRegistry) ListGroup(ctx context.Context) ([]*commonv1.Group, error) { + return listResources[*commonv1.Group](ctx, r, schema.KindGroup, "", false) +} + +// CreateGroup creates a new group. +func (r *SchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Group) error { + now := time.Now().UnixNano() + group.Metadata.ModRevision = now + group.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindGroup, group) +} + +// UpdateGroup updates an existing group. +func (r *SchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error { + now := time.Now().UnixNano() + group.Metadata.ModRevision = now + group.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindGroup, group) +} + +// DeleteGroup deletes a group. +func (r *SchemaRegistry) DeleteGroup(ctx context.Context, group string) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindGroup, "", group) +} + +// IndexRule methods. + +// GetIndexRule retrieves an index rule by metadata. +func (r *SchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error) { + return getResource[*databasev1.IndexRule](ctx, r, schema.KindIndexRule, metadata.GetGroup(), metadata.GetName()) +} + +// ListIndexRule lists index rules in a group. +func (r *SchemaRegistry) ListIndexRule(ctx context.Context, opt schema.ListOpt) ([]*databasev1.IndexRule, error) { + return listResources[*databasev1.IndexRule](ctx, r, schema.KindIndexRule, opt.Group, true) +} + +// CreateIndexRule creates a new index rule. +func (r *SchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error { + now := time.Now().UnixNano() + indexRule.Metadata.ModRevision = now + indexRule.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindIndexRule, indexRule) +} + +// UpdateIndexRule updates an existing index rule. +func (r *SchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error { + now := time.Now().UnixNano() + indexRule.Metadata.ModRevision = now + indexRule.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindIndexRule, indexRule) +} + +// DeleteIndexRule deletes an index rule. +func (r *SchemaRegistry) DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindIndexRule, metadata.GetGroup(), metadata.GetName()) +} + +// IndexRuleBinding methods. + +// GetIndexRuleBinding retrieves an index rule binding by metadata. +func (r *SchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error) { + return getResource[*databasev1.IndexRuleBinding](ctx, r, schema.KindIndexRuleBinding, metadata.GetGroup(), metadata.GetName()) +} + +// ListIndexRuleBinding lists index rule bindings in a group. +func (r *SchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt schema.ListOpt) ([]*databasev1.IndexRuleBinding, error) { + return listResources[*databasev1.IndexRuleBinding](ctx, r, schema.KindIndexRuleBinding, opt.Group, true) +} + +// CreateIndexRuleBinding creates a new index rule binding. +func (r *SchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error { + now := time.Now().UnixNano() + indexRuleBinding.Metadata.ModRevision = now + indexRuleBinding.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindIndexRuleBinding, indexRuleBinding) +} + +// UpdateIndexRuleBinding updates an existing index rule binding. +func (r *SchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error { + now := time.Now().UnixNano() + indexRuleBinding.Metadata.ModRevision = now + indexRuleBinding.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindIndexRuleBinding, indexRuleBinding) +} + +// DeleteIndexRuleBinding deletes an index rule binding. +func (r *SchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindIndexRuleBinding, metadata.GetGroup(), metadata.GetName()) +} + +// TopNAggregation methods. + +// GetTopNAggregation retrieves a top-N aggregation by metadata. +func (r *SchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.TopNAggregation, error) { + return getResource[*databasev1.TopNAggregation](ctx, r, schema.KindTopNAggregation, metadata.GetGroup(), metadata.GetName()) +} + +// ListTopNAggregation lists top-N aggregations in a group. +func (r *SchemaRegistry) ListTopNAggregation(ctx context.Context, opt schema.ListOpt) ([]*databasev1.TopNAggregation, error) { + return listResources[*databasev1.TopNAggregation](ctx, r, schema.KindTopNAggregation, opt.Group, true) +} + +// CreateTopNAggregation creates a new top-N aggregation. +func (r *SchemaRegistry) CreateTopNAggregation(ctx context.Context, topN *databasev1.TopNAggregation) error { + now := time.Now().UnixNano() + topN.Metadata.ModRevision = now + topN.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindTopNAggregation, topN) +} + +// UpdateTopNAggregation updates an existing top-N aggregation. +func (r *SchemaRegistry) UpdateTopNAggregation(ctx context.Context, topN *databasev1.TopNAggregation) error { + now := time.Now().UnixNano() + topN.Metadata.ModRevision = now + topN.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindTopNAggregation, topN) +} + +// DeleteTopNAggregation deletes a top-N aggregation. +func (r *SchemaRegistry) DeleteTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindTopNAggregation, metadata.GetGroup(), metadata.GetName()) +} + +// Node methods. + +// ListNode lists nodes by role. +func (r *SchemaRegistry) ListNode(context.Context, databasev1.Role) ([]*databasev1.Node, error) { + panic("property based schema registry does not support list node") +} + +// RegisterNode registers a node. +func (r *SchemaRegistry) RegisterNode(context.Context, *databasev1.Node, bool) error { + panic("property based schema registry does not support register node") +} + +// GetNode retrieves a node by name. +func (r *SchemaRegistry) GetNode(context.Context, string) (*databasev1.Node, error) { + panic("property based schema registry does not support get node") +} + +// UpdateNode updates a node. +func (r *SchemaRegistry) UpdateNode(context.Context, *databasev1.Node) error { + panic("property based schema registry does not support update node") +} + +// Property methods. + +// GetProperty retrieves a property by metadata. +func (r *SchemaRegistry) GetProperty(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Property, error) { + return getResource[*databasev1.Property](ctx, r, schema.KindProperty, metadata.GetGroup(), metadata.GetName()) +} + +// ListProperty lists properties in a group. +func (r *SchemaRegistry) ListProperty(ctx context.Context, opt schema.ListOpt) ([]*databasev1.Property, error) { + return listResources[*databasev1.Property](ctx, r, schema.KindProperty, opt.Group, true) +} + +// CreateProperty creates a new property. +func (r *SchemaRegistry) CreateProperty(ctx context.Context, property *databasev1.Property) error { + now := time.Now().UnixNano() + property.Metadata.ModRevision = now + property.UpdatedAt = timestamppb.Now() + return createResource(ctx, r, schema.KindProperty, property) +} + +// UpdateProperty updates an existing property. +func (r *SchemaRegistry) UpdateProperty(ctx context.Context, property *databasev1.Property) error { + now := time.Now().UnixNano() + property.Metadata.ModRevision = now + property.UpdatedAt = timestamppb.Now() + return updateResource(ctx, r, schema.KindProperty, property) +} + +// DeleteProperty deletes a property. +func (r *SchemaRegistry) DeleteProperty(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { + return r.deleteFromAllServers(ctx, schema.KindProperty, metadata.GetGroup(), metadata.GetName()) +} + +func getResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, group, name string) (T, error) { + var zero T + prop, getErr := r.getSchema(ctx, kind, group, name) + if getErr != nil { + return zero, getErr + } + if prop == nil { + return zero, schema.ErrGRPCResourceNotFound + } + md, convErr := ToSchema(kind, prop) + if convErr != nil { + return zero, convErr + } + result, ok := md.Spec.(T) + if !ok { + return zero, errors.Errorf("unexpected spec type for kind %s", kind) + } + return result, nil +} + +func listResources[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, group string, requireGroup bool) ([]T, error) { + if requireGroup && group == "" { + return nil, schema.BadRequest("group", "group should not be empty") + } + props, listErr := r.listSchemas(ctx, kind, group) + if listErr != nil { + return nil, listErr + } + results := make([]T, 0, len(props)) + for _, prop := range props { + md, convErr := ToSchema(kind, prop) + if convErr != nil { + r.l.Warn().Err(convErr).Stringer("kind", kind).Msg("failed to convert property") + continue + } + result, ok := md.Spec.(T) + if !ok { + r.l.Warn().Stringer("kind", kind).Msg("unexpected spec type") + continue + } + results = append(results, result) + } + return results, nil +} + +func initResourceFromClient[T proto.Message](ctx context.Context, r *SchemaRegistry, client schemav1.SchemaManagementServiceClient, + kind schema.Kind, group string, maxRevision *int64, +) { + props, listErr := r.listSchemasFromClient(ctx, client, kind, group) + if listErr != nil { + r.l.Warn().Err(listErr).Str("group", group).Msg("failed to list streams") + return + } + for _, prop := range props { + md, convErr := ToSchema(kind, prop) + if convErr != nil { + r.l.Warn().Err(convErr).Msgf("failed to convert property to %s", kind) + continue + } + *maxRevision = r.processInitialResource(kind, md.Spec.(T), *maxRevision) + } +} + +func createResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, spec T) error { + metadata, err := getMetadataFromSpec(kind, spec) + if err != nil { + return err + } + originalSchema, err := r.getSchema(ctx, kind, metadata.Group, metadata.Name) + if err != nil { + return err + } + if originalSchema != nil { + return fmt.Errorf("schema %s/%s already exist", metadata.Group, metadata.Name) + } + prop, convErr := SchemaToProperty(kind, spec) + if convErr != nil { + return convErr + } + return r.insertToAllServers(ctx, prop) +} + +func updateResource[T proto.Message](ctx context.Context, r *SchemaRegistry, kind schema.Kind, spec T) error { + metadata, err := getMetadataFromSpec(kind, spec) + if err != nil { + return err + } + originalSchema, err := r.getSchema(ctx, kind, metadata.Group, metadata.Name) + if err != nil { + return err + } + if originalSchema == nil { + return fmt.Errorf("schema %s/%s not exist", metadata.Group, metadata.Name) + } + prop, convErr := SchemaToProperty(kind, spec) + if convErr != nil { + return convErr + } + return r.updateToAllServers(ctx, prop) +} + +func (r *SchemaRegistry) getSchema(ctx context.Context, kind schema.Kind, group, name string) (*propertyv1.Property, error) { + propID := BuildPropertyID(kind, &commonv1.Metadata{Group: group, Name: name}) + query := buildSchemaQuery(kind, group, []string{propID}) + propMap, queryErr := r.queryAndRepairSchemas(ctx, query) + if queryErr != nil { + return nil, queryErr + } + info := propMap[propID] + if info == nil || info.best == nil || info.best.deleteTime > 0 { + return nil, nil + } + return info.best.property, nil +} + +type schemaWithDeleteTime struct { + property *propertyv1.Property + deleteTime int64 +} + +type propInfo struct { + nodeRev map[string]int64 + nodeDelTime map[string]int64 + best *schemaWithDeleteTime +} + +func (r *SchemaRegistry) listSchemas(ctx context.Context, kind schema.Kind, group string) ([]*propertyv1.Property, error) { + query := buildSchemaQuery(kind, group, nil) + propMap, queryErr := r.queryAndRepairSchemas(ctx, query) + if queryErr != nil { + return nil, queryErr + } + result := make([]*propertyv1.Property, 0, len(propMap)) + for _, info := range propMap { + if info.best != nil && info.best.deleteTime == 0 { + result = append(result, info.best.property) + } + } + return result, nil +} + +func (r *SchemaRegistry) getNodeConnectionsWithAddr() map[string]*nodeConnection { + r.connMu.RLock() + defer r.connMu.RUnlock() + result := make(map[string]*nodeConnection, len(r.nodeConns)) + for addr, nc := range r.nodeConns { + result[addr] = nc + } + return result +} + +func (r *SchemaRegistry) querySchemasFromClient(ctx context.Context, client schemav1.SchemaManagementServiceClient, + query *propertyv1.QueryRequest, +) ([]*schemaWithDeleteTime, error) { + resp, listErr := client.ListSchemas(ctx, &schemav1.ListSchemasRequest{Query: query}) + if listErr != nil { + return nil, listErr + } + results := make([]*schemaWithDeleteTime, 0, len(resp.Properties)) + for idx, prop := range resp.Properties { + var deleteTime int64 + if idx < len(resp.DeleteTimes) { + deleteTime = resp.DeleteTimes[idx] + } + results = append(results, &schemaWithDeleteTime{ + property: prop, + deleteTime: deleteTime, + }) + } + return results, nil +} + +func buildSchemaQuery(kind schema.Kind, group string, ids []string) *propertyv1.QueryRequest { + query := &propertyv1.QueryRequest{ + Groups: []string{SchemaGroup}, + Name: kind.String(), + Ids: ids, + Limit: 10000, + } + if group != "" { + query.Criteria = &modelv1.Criteria{ + Exp: &modelv1.Criteria_Condition{ + Condition: &modelv1.Condition{ + Name: TagKeyGroup, + Op: modelv1.Condition_BINARY_OP_EQ, + Value: &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: group}}, + }, + }, + }, + } + } + return query +} + +type schemaQueryNodeResult struct { + err error + addr string + schemas []*schemaWithDeleteTime +} + +func (r *SchemaRegistry) queryAndRepairSchemas(ctx context.Context, query *propertyv1.QueryRequest) (map[string]*propInfo, error) { + if r.metrics != nil { + r.metrics.totalQueryStarted.Inc(1, "list") + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalQueryFinished.Inc(1, "list") + r.metrics.totalQueryLatency.Inc(time.Since(start).Seconds(), "list") + } + }() + nodeConns := r.getNodeConnectionsWithAddr() + if len(nodeConns) == 0 { + if r.metrics != nil { + r.metrics.totalQueryErr.Inc(1, "list") + } + return nil, errNoMetadataServers + } + resultCh := make(chan schemaQueryNodeResult, len(nodeConns)) + for addr, nc := range nodeConns { + go func(address string, client schemav1.SchemaManagementServiceClient) { + schemas, queryErr := r.querySchemasFromClient(ctx, client, query) + resultCh <- schemaQueryNodeResult{addr: address, schemas: schemas, err: queryErr} + }(addr, nc.mgrClient) + } + propMap := make(map[string]*propInfo) + for range len(nodeConns) { + res := <-resultCh + if res.err != nil { + r.l.Warn().Err(res.err).Str("node", res.addr).Msg("failed to query node") + continue + } + for _, s := range res.schemas { + propID := s.property.Id + rev := s.property.UpdatedAt.AsTime().UnixNano() + info, exists := propMap[propID] + if !exists { + info = &propInfo{ + nodeRev: make(map[string]int64), + nodeDelTime: make(map[string]int64), + } + propMap[propID] = info + } + if existingRev, ok := info.nodeRev[res.addr]; !ok || rev > existingRev { + info.nodeRev[res.addr] = rev + info.nodeDelTime[res.addr] = s.deleteTime + } + if info.best == nil || info.best.property.UpdatedAt.AsTime().UnixNano() < rev { + info.best = s + } + } + } + r.repairInconsistentNodes(ctx, nodeConns, propMap) + return propMap, nil +} + +func (r *SchemaRegistry) repairInconsistentNodes(ctx context.Context, nodeConns map[string]*nodeConnection, propMap map[string]*propInfo) { + if r.metrics != nil { + r.metrics.totalRepairStarted.Inc(1) + } + defer func() { + if r.metrics != nil { + r.metrics.totalRepairFinished.Inc(1) + } + }() + for propID, info := range propMap { + if info.best == nil { + continue + } + bestRev := info.best.property.UpdatedAt.AsTime().UnixNano() + var nodesToRepair []string + for addr := range nodeConns { + nodeRev, exists := info.nodeRev[addr] + if !exists || nodeRev < bestRev { + nodesToRepair = append(nodesToRepair, addr) + } + } + if len(nodesToRepair) == 0 { + continue + } + if r.metrics != nil { + r.metrics.totalRepairNodes.Inc(float64(len(nodesToRepair))) + } + r.l.Info().Str("propID", propID).Int64("bestRev", bestRev). + Int64("deleteTime", info.best.deleteTime). + Strs("nodesToRepair", nodesToRepair).Msg("repairing schema inconsistency") + var wg sync.WaitGroup + for _, addr := range nodesToRepair { + nc := nodeConns[addr] + wg.Add(1) + go func(nodeAddr string, client schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, repairErr := client.RepairSchema(ctx, &schemav1.RepairSchemaRequest{ + Property: info.best.property, + DeleteTime: info.best.deleteTime, + }) + if repairErr != nil { + r.l.Warn().Err(repairErr).Str("propID", propID).Str("node", nodeAddr).Msg("repair failed") + } else { + r.l.Info().Str("propID", propID).Str("node", nodeAddr).Msg("repaired successfully") + } + }(addr, nc.mgrClient) + } + wg.Wait() + } +} + +func (r *SchemaRegistry) insertToAllServers(ctx context.Context, prop *propertyv1.Property) error { + kind, group := extractKindAndGroupFromProperty(prop) + if r.metrics != nil { + r.metrics.totalWriteStarted.Inc(1, kind, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalWriteFinished.Inc(1, kind, group) + r.metrics.totalWriteLatency.Inc(time.Since(start).Seconds(), kind, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return errNoMetadataServers + } + var firstErr error + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, callErr := c.InsertSchema(ctx, &schemav1.InsertSchemaRequest{Property: prop}) + if callErr != nil { + mu.Lock() + if firstErr == nil { + firstErr = callErr + } + mu.Unlock() + } + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return firstErr +} + +func (r *SchemaRegistry) updateToAllServers(ctx context.Context, prop *propertyv1.Property) error { + kind, group := extractKindAndGroupFromProperty(prop) + if r.metrics != nil { + r.metrics.totalWriteStarted.Inc(1, kind, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalWriteFinished.Inc(1, kind, group) + r.metrics.totalWriteLatency.Inc(time.Since(start).Seconds(), kind, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return errNoMetadataServers + } + var firstErr error + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + _, callErr := c.UpdateSchema(ctx, &schemav1.UpdateSchemaRequest{Property: prop}) + if callErr != nil { + mu.Lock() + if firstErr == nil { + firstErr = callErr + } + mu.Unlock() + } + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalWriteErr.Inc(1, kind, group) + } + return firstErr +} + +func (r *SchemaRegistry) deleteFromAllServers(ctx context.Context, kind schema.Kind, group, name string) (bool, error) { + kindStr := kind.String() + if r.metrics != nil { + r.metrics.totalDeleteStarted.Inc(1, kindStr, group) + } + start := time.Now() + defer func() { + if r.metrics != nil { + r.metrics.totalDeleteFinished.Inc(1, kindStr, group) + r.metrics.totalDeleteLatency.Inc(time.Since(start).Seconds(), kindStr, group) + } + }() + clients := r.getClients() + if len(clients) == 0 { + if r.metrics != nil { + r.metrics.totalDeleteErr.Inc(1, kindStr, group) + } + return false, errNoMetadataServers + } + propID := BuildPropertyID(kind, &commonv1.Metadata{Group: group, Name: name}) + var firstErr error + var found bool + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c schemav1.SchemaManagementServiceClient) { + defer wg.Done() + resp, callErr := c.DeleteSchema(ctx, &schemav1.DeleteSchemaRequest{ + Delete: &propertyv1.DeleteRequest{ + Group: SchemaGroup, + Name: kind.String(), + Id: propID, + }, + // update the latest update time for the notification + UpdateAt: timestamppb.Now(), + }) + mu.Lock() + if callErr != nil && firstErr == nil { + firstErr = callErr + } + if resp != nil && resp.Found { + found = true + } + mu.Unlock() + }(client.mgrClient) + } + wg.Wait() + if firstErr != nil && r.metrics != nil { + r.metrics.totalDeleteErr.Inc(1, kindStr, group) + } + return found, firstErr +} + +func (r *SchemaRegistry) initializeFromNode(nodeConn *nodeConnection) { Review Comment: 1. Use a channel to prevent concurrent initialization from different nodes. 2. Use maxRevision to load the properties whose revision is greater than it, to reduce the initialization traffic storm. ########## banyand/metadata/schema/property/client.go: ########## @@ -0,0 +1,1388 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var errNoMetadataServers = errors.New("no metadata servers available") + +type clientMetrics struct { + totalSyncStarted meter.Counter + totalSyncFinished meter.Counter + totalSyncErr meter.Counter + totalSyncLatency meter.Counter + totalSyncUpdates meter.Counter + + totalQueryStarted meter.Counter + totalQueryFinished meter.Counter + totalQueryErr meter.Counter + totalQueryLatency meter.Counter + + totalRepairStarted meter.Counter + totalRepairFinished meter.Counter + totalRepairNodes meter.Counter + + totalWriteStarted meter.Counter + totalWriteFinished meter.Counter + totalWriteErr meter.Counter + totalWriteLatency meter.Counter + + totalDeleteStarted meter.Counter + totalDeleteFinished meter.Counter + totalDeleteErr meter.Counter + totalDeleteLatency meter.Counter + + cacheSize meter.Gauge +} + +func newClientMetrics(factory observability.Factory) *clientMetrics { + return &clientMetrics{ + totalSyncStarted: factory.NewCounter("property_sync_started"), + totalSyncFinished: factory.NewCounter("property_sync_finished"), + totalSyncErr: factory.NewCounter("property_sync_err"), + totalSyncLatency: factory.NewCounter("property_sync_latency"), + totalSyncUpdates: factory.NewCounter("property_sync_updates"), + + totalQueryStarted: factory.NewCounter("property_query_started", "method"), + totalQueryFinished: factory.NewCounter("property_query_finished", "method"), + totalQueryErr: factory.NewCounter("property_query_err", "method"), + totalQueryLatency: factory.NewCounter("property_query_latency", "method"), + + totalRepairStarted: factory.NewCounter("property_repair_started"), + totalRepairFinished: factory.NewCounter("property_repair_finished"), + totalRepairNodes: factory.NewCounter("property_repair_nodes"), + + totalWriteStarted: factory.NewCounter("property_write_started", "kind", "group"), + totalWriteFinished: factory.NewCounter("property_write_finished", "kind", "group"), + totalWriteErr: factory.NewCounter("property_write_err", "kind", "group"), + totalWriteLatency: factory.NewCounter("property_write_latency", "kind", "group"), + + totalDeleteStarted: factory.NewCounter("property_delete_started", "kind", "group"), + totalDeleteFinished: factory.NewCounter("property_delete_finished", "kind", "group"), + totalDeleteErr: factory.NewCounter("property_delete_err", "kind", "group"), + totalDeleteLatency: factory.NewCounter("property_delete_latency", "kind", "group"), + + cacheSize: factory.NewGauge("property_cache_size"), + } +} + +// nodeConnection represents a connection to a metadata node. +type nodeConnection struct { + mgrClient schemav1.SchemaManagementServiceClient + updateClient schemav1.SchemaUpdateServiceClient + conn *grpc.ClientConn +} + +// SchemaRegistry implements schema.Registry interface using property-based storage. +type SchemaRegistry struct { + dialOptsPrv common.GRPCDialOptionsProvider + handlers map[schema.Kind][]schema.EventHandler + nodeConns map[string]*nodeConnection + cache *schemaCache + closer *run.Closer + syncCloser *run.Closer + l *logger.Logger + metrics *clientMetrics + mu sync.RWMutex + connMu sync.RWMutex + grpcTimeout time.Duration + syncInterval time.Duration +} + +// NewSchemaRegistryClient creates a new property schema registry client. +// It accepts a NodeDiscovery service to dynamically discover and connect to metadata nodes. +func NewSchemaRegistryClient(dialOptsProvider common.GRPCDialOptionsProvider, grpcTimeout, syncInterval time.Duration) *SchemaRegistry { + r := &SchemaRegistry{ + dialOptsPrv: dialOptsProvider, + nodeConns: make(map[string]*nodeConnection), + handlers: make(map[schema.Kind][]schema.EventHandler), + cache: newSchemaCache(), + closer: run.NewCloser(1), + l: logger.GetLogger("property-schema-registry"), + grpcTimeout: grpcTimeout, + syncInterval: syncInterval, + } + return r +} + +// SetMetrics initializes the client metrics using the provided metrics registry. +func (r *SchemaRegistry) SetMetrics(omr observability.MetricsRegistry) { + if omr == nil { + return + } + clientScope := metadataScope.SubScope("client") + r.metrics = newClientMetrics(omr.With(clientScope)) +} + +// OnInit implements schema.EventHandler. +func (r *SchemaRegistry) OnInit(_ []schema.Kind) (bool, []int64) { + return false, nil +} + +// OnAddOrUpdate implements schema.EventHandler for getting the all metadata nodes. +func (r *SchemaRegistry) OnAddOrUpdate(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + containsMetadata := false + for _, role := range node.Roles { + if role == databasev1.Role_ROLE_META { + containsMetadata = true + break + } + } + if !containsMetadata { + return + } + r.addNodeConnection(node) +} + +// OnDelete implements schema.EventHandler for getting which metadata node has been deleted. +func (r *SchemaRegistry) OnDelete(m schema.Metadata) { + if m.Kind != schema.KindNode { + return + } + node, ok := m.Spec.(*databasev1.Node) + if !ok { + return + } + r.removeNodeConnection(node.GetGrpcAddress()) +} + +func (r *SchemaRegistry) addNodeConnection(node *databasev1.Node) { + address := node.GetGrpcAddress() + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + if _, exists := r.nodeConns[address]; exists { + return + } + var dialOpts []grpc.DialOption + if r.dialOptsPrv != nil { + var optsErr error + dialOpts, optsErr = r.dialOptsPrv.GetDialOptions(address) + if optsErr != nil { + r.l.Warn().Err(optsErr).Str("address", address).Msg("failed to get dial options") + return + } + } + conn, connErr := grpchelper.Conn(address, r.grpcTimeout, dialOpts...) + if connErr != nil { + r.l.Warn().Err(connErr).Str("address", address).Msg("failed to connect to metadata node") + return + } + schemaMgrClient := schemav1.NewSchemaManagementServiceClient(conn) + schemaUpdateClient := schemav1.NewSchemaUpdateServiceClient(conn) + nodeConn := &nodeConnection{ + mgrClient: schemaMgrClient, + updateClient: schemaUpdateClient, + conn: conn, + } + r.nodeConns[address] = nodeConn + r.l.Info().Str("address", address).Str("node", node.GetMetadata().GetName()).Msg("connected to metadata node") + go r.initializeFromNode(nodeConn) +} + +func (r *SchemaRegistry) removeNodeConnection(address string) { + if address == "" { + return + } + r.connMu.Lock() + defer r.connMu.Unlock() + nc, exists := r.nodeConns[address] + if !exists { + return + } + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", address).Msg("failed to close connection") + } + } + delete(r.nodeConns, address) + r.l.Info().Str("address", address).Msg("disconnected from metadata node") +} + +// Close closes the registry. +func (r *SchemaRegistry) Close() error { + r.closer.Done() + r.closer.CloseThenWait() + if r.syncCloser != nil { + r.syncCloser.Done() + r.syncCloser.CloseThenWait() + } + r.connMu.Lock() + defer r.connMu.Unlock() + for addr, nc := range r.nodeConns { + if nc.conn != nil { + if closeErr := nc.conn.Close(); closeErr != nil { + r.l.Warn().Err(closeErr).Str("address", addr).Msg("failed to close connection") + } + } + } + r.nodeConns = make(map[string]*nodeConnection) + return nil +} + +// RegisterHandler registers an event handler for a schema kind. +func (r *SchemaRegistry) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) { + // Validate kind + if kind&schema.KindMask != kind { + panic(fmt.Sprintf("invalid kind %d", kind)) + } + r.mu.Lock() + defer r.mu.Unlock() + var kinds []schema.Kind + for i := 0; i < schema.KindSize; i++ { + ki := schema.Kind(1 << i) + if kind&ki > 0 { + kinds = append(kinds, ki) + } + } + r.l.Info().Str("name", name).Interface("kinds", kinds).Msg("registering handler") + handler.OnInit(kinds) Review Comment: ```suggestion ``` Do not do it since the property-based registry doesn't support watching. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
