Copilot commented on code in PR #982:
URL:
https://github.com/apache/skywalking-banyandb/pull/982#discussion_r2867261284
##########
banyand/metadata/service/server.go:
##########
@@ -42,100 +46,165 @@ var (
schemaTypeProperty = "property"
)
+// Service extends metadata.Service with schema server port access.
+type Service interface {
+ metadata.Service
+ GetSchemaServerPort() *uint32
+ GetSchemaGossipPort() *uint32
+ SetPropertyPipelineClient(queue.Client)
+}
+
type server struct {
metadata.Service
etcdServer embeddedetcd.Server
propServer schemaserver.Server
+ repairSvc schemaserver.RepairService
+ pipelineClient queue.Client
+ omr observability.MetricsRegistry
+ serviceFlags *run.FlagSet
scheduler *timestamp.Scheduler
ecli *clientv3.Client
closer *run.Closer
rootDir string
defragCron string
autoCompactionMode string
autoCompactionRetention string
- schemaStorageType string
+ schemaRegistryMode string
+ nodeDiscoveryMode string
listenClientURL []string
listenPeerURL []string
quotaBackendBytes run.Bytes
+ embedded bool
}
func (s *server) Name() string {
return "metadata"
}
func (s *server) Role() databasev1.Role {
- return databasev1.Role_ROLE_META
+ if s.schemaRegistryMode == schemaTypeProperty {
+ return databasev1.Role_ROLE_META
+ }
+ return databasev1.Role_ROLE_UNSPECIFIED
}
func (s *server) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
- fs.StringVar(&s.schemaStorageType, "schema-storage-type",
schemaTypeEtcd, "schema storage type: embedetcd or property")
- fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path
of metadata")
- fs.StringVar(&s.autoCompactionMode, "etcd-auto-compaction-mode",
"periodic", "auto compaction mode: 'periodic' or 'revision'")
- fs.StringVar(&s.autoCompactionRetention,
"etcd-auto-compaction-retention", "1h", "auto compaction retention: e.g. '1h',
'30m', '24h' for periodic; '1000' for revision")
- fs.StringVar(&s.defragCron, "etcd-defrag-cron", "@daily",
"defragmentation cron: e.g. '@daily', '@hourly', '0 0 * * 0', '0 */6 * * *'")
- fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
- fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
- fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota
for backend storage")
- if s.propServer != nil {
- fs.AddFlagSet(s.propServer.FlagSet().FlagSet)
+ fs.StringVar(&s.schemaRegistryMode, "schema-registry-mode",
schemaTypeEtcd,
+ "Schema registry mode: 'etcd' for etcd-based storage,
'property' for property-based storage")
+ fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode",
metadata.NodeDiscoveryModeEtcd,
+ "Node discovery mode: 'etcd' for etcd-based, 'dns' for
DNS-based, 'file' for file-based")
Review Comment:
`FlagSet()` defines `node-discovery-mode` on this wrapper flagset and later
adds the underlying `s.Service.FlagSet()` which already defines the same flag
(see `banyand/metadata/client.go`). Adding both flagsets will cause a duplicate
flag definition error at runtime. Consider removing the wrapper-level
`node-discovery-mode` flag and relying on the embedded service's flag, or
otherwise avoid adding two flagsets that both define the same flag name.
```suggestion
```
##########
banyand/metadata/service/server.go:
##########
@@ -42,100 +46,165 @@ var (
schemaTypeProperty = "property"
Review Comment:
`schemaTypeEtcd` is set to `"embedetcd"`, but the `--schema-registry-mode`
help text describes the etcd option as `'etcd'` and the `PreRun` switch only
recognizes `"embedetcd"`. This mismatch will confuse users and makes
`--schema-registry-mode=etcd` fail. Consider using `"etcd"` as the canonical
value (optionally supporting `embedetcd` as an alias) or update the help text
to explicitly document `embedetcd`.
##########
banyand/metadata/service/server.go:
##########
@@ -42,100 +46,165 @@ var (
schemaTypeProperty = "property"
)
+// Service extends metadata.Service with schema server port access.
+type Service interface {
+ metadata.Service
+ GetSchemaServerPort() *uint32
+ GetSchemaGossipPort() *uint32
+ SetPropertyPipelineClient(queue.Client)
+}
+
type server struct {
metadata.Service
etcdServer embeddedetcd.Server
propServer schemaserver.Server
+ repairSvc schemaserver.RepairService
+ pipelineClient queue.Client
+ omr observability.MetricsRegistry
+ serviceFlags *run.FlagSet
scheduler *timestamp.Scheduler
ecli *clientv3.Client
closer *run.Closer
rootDir string
defragCron string
autoCompactionMode string
autoCompactionRetention string
- schemaStorageType string
+ schemaRegistryMode string
+ nodeDiscoveryMode string
listenClientURL []string
listenPeerURL []string
quotaBackendBytes run.Bytes
+ embedded bool
}
func (s *server) Name() string {
return "metadata"
}
func (s *server) Role() databasev1.Role {
- return databasev1.Role_ROLE_META
+ if s.schemaRegistryMode == schemaTypeProperty {
Review Comment:
`Role()` now returns `ROLE_UNSPECIFIED` unless `schemaRegistryMode ==
"property"`. In standalone/embedded etcd mode this means the node roles
assembled by `run.Group` will no longer include `ROLE_META`, so metadata node
registration will not advertise itself as a META node. If META role is still
required when running the embedded metadata service, consider returning
`ROLE_META` when `s.embedded` is true (or otherwise decoupling role from
`schemaRegistryMode`).
```suggestion
if s.schemaRegistryMode == schemaTypeProperty || s.embedded {
```
##########
banyand/metadata/schema/schemaserver/repair_service.go:
##########
@@ -0,0 +1,194 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/robfig/cron/v3"
+)
+
+// RepairService is the independent schema repair service.
+type RepairService interface {
+ run.PreRunner
+ run.Config
+ run.Service
+ GetGossipPort() *uint32
+}
+
+type repairService struct {
+ schema.UnimplementedOnInitHandler
+ registerGossip func(gossip.Messenger)
+ metadata metadata.Repo
+ messenger gossip.Messenger
+ scheduler *timestamp.Scheduler
+ closer *run.Closer
+ l *logger.Logger
+ metaNodes map[string]struct{}
+ mu sync.RWMutex
+ repairCron string
+}
+
+// NewGossipService creates a new schema repair service.
+func NewGossipService(registerGossip func(gossip.Messenger), metadata
metadata.Repo,
+ pipelineClient queue.Client, omr observability.MetricsRegistry,
+) RepairService {
+ return &repairService{
+ registerGossip: registerGossip,
+ metadata: metadata,
+ closer: run.NewCloser(1),
+ metaNodes: make(map[string]struct{}),
+ messenger: gossip.NewMessenger(
+ "schema_property",
Review Comment:
The gossip messenger prefix is set to `"schema_property"`, which results in
flag names like `schema_property-gossip-grpc-host` (underscore in the flag
name). Elsewhere flags use kebab-case prefixes (e.g.
`property-repair-gossip-*`). Consider switching this prefix to a hyphenated
form (e.g. `schema-property` or `schema-repair`) to keep flag naming consistent
and avoid underscores in CLI flags/metrics scopes.
```suggestion
"schema-property",
```
##########
banyand/property/service.go:
##########
@@ -248,6 +248,8 @@ func NewService(metadata metadata.Repo, pipeline
queue.Server, pipelineClient qu
pm: pm,
closer: run.NewCloser(0),
- gossipMessenger: gossip.NewMessenger(omr, metadata,
pipelineClient),
+ gossipMessenger: gossip.NewMessenger("property-repair",
+ func(n *databasev1.Node) string { return
n.PropertyRepairGossipGrpcAddress },
+ omr, metadata, pipelineClient, 17932),
Review Comment:
`pipelineClient` may be nil in some setups (e.g. standalone currently passes
nil), but the gossip messenger's `PreRun` can call tracing initialization that
uses `s.pipeline.Register(...)` (see `banyand/property/gossip/trace.go`), which
will panic if the pipeline client is nil. Either require a non-nil
`pipelineClient` here (pass `pipeline` if it implements `queue.Client`), or
guard tracing/pipeline usage in the gossip messenger when `pipelineClient ==
nil`.
##########
banyand/metadata/service/server.go:
##########
@@ -42,100 +46,165 @@ var (
schemaTypeProperty = "property"
)
+// Service extends metadata.Service with schema server port access.
+type Service interface {
+ metadata.Service
+ GetSchemaServerPort() *uint32
+ GetSchemaGossipPort() *uint32
+ SetPropertyPipelineClient(queue.Client)
+}
+
type server struct {
metadata.Service
etcdServer embeddedetcd.Server
propServer schemaserver.Server
+ repairSvc schemaserver.RepairService
+ pipelineClient queue.Client
+ omr observability.MetricsRegistry
+ serviceFlags *run.FlagSet
scheduler *timestamp.Scheduler
ecli *clientv3.Client
closer *run.Closer
rootDir string
defragCron string
autoCompactionMode string
autoCompactionRetention string
- schemaStorageType string
+ schemaRegistryMode string
+ nodeDiscoveryMode string
listenClientURL []string
listenPeerURL []string
quotaBackendBytes run.Bytes
+ embedded bool
}
func (s *server) Name() string {
return "metadata"
}
func (s *server) Role() databasev1.Role {
- return databasev1.Role_ROLE_META
+ if s.schemaRegistryMode == schemaTypeProperty {
+ return databasev1.Role_ROLE_META
+ }
+ return databasev1.Role_ROLE_UNSPECIFIED
}
func (s *server) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
- fs.StringVar(&s.schemaStorageType, "schema-storage-type",
schemaTypeEtcd, "schema storage type: embedetcd or property")
- fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path
of metadata")
- fs.StringVar(&s.autoCompactionMode, "etcd-auto-compaction-mode",
"periodic", "auto compaction mode: 'periodic' or 'revision'")
- fs.StringVar(&s.autoCompactionRetention,
"etcd-auto-compaction-retention", "1h", "auto compaction retention: e.g. '1h',
'30m', '24h' for periodic; '1000' for revision")
- fs.StringVar(&s.defragCron, "etcd-defrag-cron", "@daily",
"defragmentation cron: e.g. '@daily', '@hourly', '0 0 * * 0', '0 */6 * * *'")
- fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
- fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
- fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota
for backend storage")
- if s.propServer != nil {
- fs.AddFlagSet(s.propServer.FlagSet().FlagSet)
+ fs.StringVar(&s.schemaRegistryMode, "schema-registry-mode",
schemaTypeEtcd,
+ "Schema registry mode: 'etcd' for etcd-based storage,
'property' for property-based storage")
+ fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode",
metadata.NodeDiscoveryModeEtcd,
+ "Node discovery mode: 'etcd' for etcd-based, 'dns' for
DNS-based, 'file' for file-based")
+ if s.embedded {
+ fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the
root path of metadata")
+ fs.StringVar(&s.autoCompactionMode,
"etcd-auto-compaction-mode", "periodic", "auto compaction mode: 'periodic' or
'revision'")
+ fs.StringVar(&s.autoCompactionRetention,
"etcd-auto-compaction-retention", "1h",
+ "auto compaction retention: e.g. '1h', '30m', '24h' for
periodic; '1000' for revision")
+ fs.StringVar(&s.defragCron, "etcd-defrag-cron", "@daily",
+ "defragmentation cron: e.g. '@daily', '@hourly', '0 0 *
* 0', '0 */6 * * *'")
+ fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
+ fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+ fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "",
"Quota for backend storage")
}
- fs.AddFlagSet(s.Service.FlagSet().FlagSet)
- return fs
-}
-
-func (s *server) Validate() error {
- if err := s.Service.Validate(); err != nil {
- return err
+ if s.propServer == nil {
+ omr := s.omr
+ if omr == nil {
+ omr = observability.BypassRegistry
+ }
+ s.propServer = schemaserver.NewServer(omr)
}
- if s.schemaStorageType == schemaTypeProperty {
- if s.propServer != nil {
- return s.propServer.Validate()
+ if s.repairSvc == nil && s.pipelineClient != nil {
+ omr := s.omr
+ if omr == nil {
+ omr = observability.BypassRegistry
}
- return nil
+ s.repairSvc =
schemaserver.NewGossipService(s.propServer.RegisterGossip, s.Service,
s.pipelineClient, omr)
+ }
+ if s.serviceFlags == nil {
+ s.serviceFlags = s.Service.FlagSet()
}
- if s.rootDir == "" {
- return errors.New("rootDir is empty")
+ fs.AddFlagSet(s.propServer.FlagSet().FlagSet)
+ if s.repairSvc != nil {
+ fs.AddFlagSet(s.repairSvc.FlagSet().FlagSet)
}
- if s.listenClientURL == nil {
- return errors.New("listenClientURL is empty")
+ fs.AddFlagSet(s.serviceFlags.FlagSet)
+ return fs
+}
+
+func (s *server) Validate() error {
+ if s.serviceFlags == nil {
+ return errors.New("service flags are not initialized")
}
- if s.listenPeerURL == nil {
- return errors.New("listenPeerURL is empty")
+ if err := s.serviceFlags.Set("schema-registry-mode",
s.schemaRegistryMode); err != nil {
+ return err
}
Review Comment:
`Validate()` calls `s.serviceFlags.Set("schema-registry-mode", ...)`, but
`schema-registry-mode` is not defined on the underlying metadata client flagset
(`banyand/metadata/client.go`), so this will return an error and prevent the
service from starting. Either (1) add real `schema-registry-mode` support to
the underlying metadata client (and its FlagSet/Validate logic), or (2) stop
trying to propagate this wrapper-only flag into `serviceFlags` and keep
schema-registry-mode handling entirely within this wrapper.
```suggestion
```
##########
banyand/metadata/service/server.go:
##########
@@ -181,19 +260,79 @@ func (s *server) GracefulStop() {
}
// NewService returns a new metadata repository Service.
-func NewService(_ context.Context) (metadata.Service, error) {
+// When embedded is true (standalone mode), an embedded etcd server is started.
+// When embedded is false (data node mode), the service connects to external
etcd.
+func NewService(_ context.Context, embedded bool) (Service, error) {
s := &server{
- closer: run.NewCloser(0),
- propServer:
schemaserver.NewServer(observability.BypassRegistry),
+ closer: run.NewCloser(0),
+ embedded: embedded,
}
- var err error
- s.Service, err = metadata.NewClient(true, true)
- if err != nil {
- return nil, err
+ var clientErr error
+ if embedded {
+ s.Service, clientErr = metadata.NewClient(true, true)
+ } else {
+ s.Service, clientErr = metadata.NewClient(true, false)
+ }
+ if clientErr != nil {
+ return nil, clientErr
}
return s, nil
Review Comment:
`NewService` signature changed to `NewService(ctx, embedded bool)`, but
there are still call sites using the old signature which will fail compilation
(e.g. `banyand/liaison/grpc/server_test.go`,
`banyand/measure/cache_benchmark_test.go`,
`banyand/metadata/metadata_test.go`). Update those callers to pass the new
argument, or provide a backward-compatible wrapper (e.g. `NewService(ctx)`
delegating to `NewService(ctx, true)` for embedded mode).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]