hanahmily commented on code in PR #920:
URL: 
https://github.com/apache/skywalking-banyandb/pull/920#discussion_r2659284442


##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)

Review Comment:
   Avoid defining the interface that is accessed within the same package.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()

Review Comment:
   The variable `lastSuccessMutex` appears to be redundant, as it is accessed 
only by the discovery loop.



##########
banyand/metadata/client.go:
##########
@@ -96,13 +108,35 @@ func (s *clientService) FlagSet() *run.FlagSet {
        fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key 
for the etcd client certificate.")
        fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
        fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 
30*time.Minute, "The interval for full sync etcd")
+
+       // DNS-based node discovery configuration
+       fs.BoolVar(&s.dnsRegistryEnabled, "node-registry-dns-enabled", false,

Review Comment:
   Use a single flag: "node-discovery-mode" with three options: "dns," "file," 
and "etcd." Replace "registry" with "discovery," which is more appropriate.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")

Review Comment:
   ```suggestion
                svc.log.Info().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
   ```



##########
banyand/metadata/client.go:
##########
@@ -96,13 +108,35 @@ func (s *clientService) FlagSet() *run.FlagSet {
        fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key 
for the etcd client certificate.")
        fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
        fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 
30*time.Minute, "The interval for full sync etcd")
+
+       // DNS-based node discovery configuration
+       fs.BoolVar(&s.dnsRegistryEnabled, "node-registry-dns-enabled", false,
+               "Is DNS registry enabled")
+       fs.StringSliceVar(&s.dnsSRVAddresses, 
"node-registry-dns-srv-addresses", []string{},
+               "DNS SRV addresses for node discovery (e.g., 
_grpc._tcp.banyandb.svc.cluster.local)")

Review Comment:
   ```suggestion
                "DNS SRV addresses for node discovery (e.g., 
banyandb.svc.cluster.local)")
   ```
   
   The "_grpc._tcp" part should be reserved and not exposed through the flag."



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)

Review Comment:
   You did not start the reloader. It cannot monitor the file unless it is 
activated.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, addr)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("address", addr).
+                                       Msg("Failed to fetch node metadata")
+                               addErrors = append(addErrors, fetchErr)
+                               continue
+                       }
+
+                       // update cache and notify handlers
+                       s.cacheMutex.Lock()
+                       s.nodeCache[addr] = node
+                       s.cacheMutex.Unlock()
+
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, true)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("New node discovered and added to cache")
+               }
+       }
+
+       s.cacheMutex.Lock()
+       for addr, node := range s.nodeCache {
+               if !addressSet[addr] {
+                       // update cache and notify the handlers
+                       delete(s.nodeCache, addr)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("Node removed from cache (no longer in 
DNS)")
+
+                       s.cacheMutex.Unlock()
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, false)
+                       s.cacheMutex.Lock()
+               }
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()

Review Comment:
   Copilot is correct, fix this bug.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC

Review Comment:
   fix it.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, addr)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("address", addr).
+                                       Msg("Failed to fetch node metadata")
+                               addErrors = append(addErrors, fetchErr)
+                               continue
+                       }
+
+                       // update cache and notify handlers
+                       s.cacheMutex.Lock()
+                       s.nodeCache[addr] = node
+                       s.cacheMutex.Unlock()
+
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, true)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("New node discovered and added to cache")
+               }
+       }
+
+       s.cacheMutex.Lock()
+       for addr, node := range s.nodeCache {
+               if !addressSet[addr] {
+                       // update cache and notify the handlers
+                       delete(s.nodeCache, addr)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("Node removed from cache (no longer in 
DNS)")
+
+                       s.cacheMutex.Unlock()
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, false)
+                       s.cacheMutex.Lock()
+               }
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // update total nodes metric
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+
+       if len(addErrors) > 0 {
+               return errors.Join(addErrors...)
+       }
+
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, address string) 
(*databasev1.Node, error) {
+       // Record gRPC query metrics
+       startTime := time.Now()
+       var grpcErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.grpcQueryCount.Inc(1)
+                       s.metrics.grpcQueryDuration.Observe(duration.Seconds())
+                       s.metrics.grpcQueryTotalDuration.Inc(duration.Seconds())
+                       if grpcErr != nil {
+                               s.metrics.grpcQueryFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // for TLS connections with other nodes to getting matadata
+       dialOpts, err := s.getTLSDialOptions()
+       if err != nil {
+               grpcErr = fmt.Errorf("failed to get TLS dial options: %w", err)
+               return nil, grpcErr
+       }
+       // nolint:contextcheck
+       conn, connErr := grpchelper.ConnWithAuth(address, s.grpcTimeout, "", 
"", dialOpts...)

Review Comment:
   How to handle the basic auth(username/password) if the liaison node's authN 
is activated?



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)

Review Comment:
   Use "time.Timer" instead of "Sleep". Place the timer in a "select" block to 
ensure that the close event is not blocked by the sleep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to