This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8c647e99 internal TLS reload (#882)
8c647e99 is described below
commit 8c647e999b4f31f26de94007a384b1f45fb15b4d
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Dec 8 14:23:02 2025 +0800
internal TLS reload (#882)
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
banyand/queue/pub/pub.go | 100 +++++++++++++++++++++++++++++++++++++++++++-
banyand/queue/sub/server.go | 50 +++++++++++++++++++---
docs/operation/security.md | 22 ++++++++--
3 files changed, 161 insertions(+), 11 deletions(-)
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index fd69d37a..26d5320f 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -30,6 +30,7 @@ import (
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@@ -47,6 +48,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
)
// ChunkedSyncClientConfig configures chunked sync client behavior.
@@ -75,6 +77,7 @@ type pub struct {
writableProbe map[string]map[string]struct{}
cbStates map[string]*circuitState
caCertPath string
+ caCertReloader *pkgtls.Reloader
prefix string
retryPolicy string
allowedRoles []databasev1.Role
@@ -100,7 +103,7 @@ func (p *pub) FlagSet() *run.FlagSet {
func (p *pub) Validate() error {
// simple sanity‑check: if TLS is on, a CA bundle must be provided
if p.tlsEnabled && p.caCertPath == "" {
- return fmt.Errorf("TLS is enabled (--internal-tls), but no CA
certificate file was provided (--internal-ca-cert is required)")
+ return fmt.Errorf("TLS is enabled (--data-client-tls), but no
CA certificate file was provided (--data-client-ca-cert is required)")
}
return nil
}
@@ -110,6 +113,11 @@ func (p *pub) Register(topic bus.Topic, handler
schema.EventHandler) {
}
func (p *pub) GracefulStop() {
+ // Stop CA certificate reloader if enabled
+ if p.caCertReloader != nil {
+ p.caCertReloader.Stop()
+ }
+
p.mu.Lock()
defer p.mu.Unlock()
for i := range p.evictable {
@@ -126,6 +134,35 @@ func (p *pub) GracefulStop() {
// Serve implements run.Service.
func (p *pub) Serve() run.StopNotify {
+ // Start CA certificate reloader if enabled
+ if p.caCertReloader != nil {
+ if err := p.caCertReloader.Start(); err != nil {
+ p.log.Error().Err(err).Msg("Failed to start CA
certificate reloader")
+ stopCh := p.closer.CloseNotify()
+ return stopCh
+ }
+ p.log.Info().Str("caCertPath", p.caCertPath).Msg("Started CA
certificate file monitoring")
+
+ // Listen for certificate update events
+ certUpdateCh := p.caCertReloader.GetUpdateChannel()
+ stopCh := p.closer.CloseNotify()
+ if p.closer.AddRunning() {
+ go func() {
+ defer p.closer.Done()
+ for {
+ select {
+ case <-certUpdateCh:
+ p.log.Info().Msg("CA
certificate updated, reconnecting clients")
+ p.reconnectAllClients()
+ case <-stopCh:
+ return
+ }
+ }
+ }()
+ }
+ return stopCh
+ }
+
return p.closer.CloseNotify()
}
@@ -332,6 +369,17 @@ func (p *pub) PreRun(context.Context) error {
}
p.log = logger.GetLogger("server-queue-pub-" + p.prefix)
+
+ // Initialize CA certificate reloader if TLS is enabled and CA cert
path is provided
+ if p.tlsEnabled && p.caCertPath != "" {
+ var err error
+ p.caCertReloader, err =
pkgtls.NewClientCertReloader(p.caCertPath, p.log)
+ if err != nil {
+ return errors.Wrapf(err, "failed to initialize CA
certificate reloader for %s", p.prefix)
+ }
+ p.log.Info().Str("caCertPath", p.caCertPath).Msg("Initialized
CA certificate reloader")
+ }
+
return nil
}
@@ -441,6 +489,23 @@ func isFailoverError(err error) bool {
}
func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+ if !p.tlsEnabled {
+ return grpchelper.SecureOptions(nil, false, false, "")
+ }
+
+ // Use reloader if available (for dynamic reloading)
+ if p.caCertReloader != nil {
+ // Extract server name from the connection (we'll use a default
for now)
+ // The actual server name will be validated by the TLS handshake
+ tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader: %w", err)
+ }
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // Fallback to static file reading if reloader is not available
opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false,
p.caCertPath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
@@ -448,6 +513,39 @@ func (p *pub) getClientTransportCredentials()
([]grpc.DialOption, error) {
return opts, nil
}
+// reconnectAllClients reconnects all active clients when CA certificate is
updated.
+func (p *pub) reconnectAllClients() {
+ // Collect nodes and close connections
+ p.mu.Lock()
+ nodesToReconnect := make([]schema.Metadata, 0, len(p.registered))
+ for name, node := range p.registered {
+ // Handle evictable nodes: close channel and remove from
evictable
+ if en, ok := p.evictable[name]; ok {
+ close(en.c)
+ delete(p.evictable, name)
+ }
+ // Handle active nodes: close connection and remove from active
+ if client, ok := p.active[name]; ok {
+ _ = client.conn.Close()
+ delete(p.active, name)
+ p.deleteClient(client.md)
+ }
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ nodesToReconnect = append(nodesToReconnect, md)
+ }
+ p.mu.Unlock()
+
+ // Reconnect with new credentials
+ for _, md := range nodesToReconnect {
+ p.OnAddOrUpdate(md)
+ }
+}
+
// NewChunkedSyncClient implements queue.Client.
func (p *pub) NewChunkedSyncClient(node string, chunkSize uint32)
(queue.ChunkedSyncClient, error) {
return p.NewChunkedSyncClientWithConfig(node, &ChunkedSyncClientConfig{
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index c1949b20..05ca5507 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -51,6 +51,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
)
const defaultRecvSize = 10 << 20
@@ -72,6 +73,7 @@ type server struct {
streamv1.UnimplementedStreamServiceServer
databasev1.UnimplementedSnapshotServiceServer
creds credentials.TransportCredentials
+ tlsReloader *pkgtls.Reloader
omr observability.MetricsRegistry
metrics *metrics
ser *grpclib.Server
@@ -126,6 +128,17 @@ func NewServerWithPorts(omr observability.MetricsRegistry,
flagNamePrefix string
func (s *server) PreRun(_ context.Context) error {
s.log = logger.GetLogger("server-queue-sub")
s.metrics = newMetrics(s.omr.With(queueSubScope))
+
+ // Initialize TLS reloader if TLS is enabled
+ if s.tls {
+ var err error
+ s.tlsReloader, err = pkgtls.NewReloader(s.certFile, s.keyFile,
s.log)
+ if err != nil {
+ return errors.Wrap(err, "failed to initialize TLS
reloader for queue server")
+ }
+ s.log.Info().Str("certFile", s.certFile).Str("keyFile",
s.keyFile).Msg("Initialized TLS reloader for queue server")
+ }
+
return nil
}
@@ -188,18 +201,37 @@ func (s *server) Validate() error {
if s.keyFile == "" {
return errServerKey
}
- creds, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
- if errTLS != nil {
- return errors.Wrap(errTLS, "failed to load cert and key")
- }
- s.creds = creds
+ // TLS reloader will be initialized in PreRun, so we don't need to load
credentials here
+ // The credentials will be loaded dynamically when the server starts
return nil
}
func (s *server) Serve() run.StopNotify {
var opts []grpclib.ServerOption
if s.tls {
- opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
+ // Start TLS reloader if enabled
+ if s.tlsReloader != nil {
+ if err := s.tlsReloader.Start(); err != nil {
+ s.log.Error().Err(err).Msg("Failed to start TLS
reloader for queue server")
+ stopCh := make(chan struct{})
+ close(stopCh)
+ return stopCh
+ }
+ s.log.Info().Str("certFile", s.certFile).Str("keyFile",
s.keyFile).Msg("Started TLS file monitoring for queue server")
+ tlsConfig := s.tlsReloader.GetTLSConfig()
+ creds := credentials.NewTLS(tlsConfig)
+ opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+ } else {
+ // Fallback to static loading if reloader is not
available
+ creds, errTLS :=
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+ if errTLS != nil {
+ s.log.Error().Err(errTLS).Msg("Failed to load
TLS credentials")
+ stopCh := make(chan struct{})
+ close(stopCh)
+ return stopCh
+ }
+ opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+ }
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack",
string(debug.Stack())).Msg("recovered from panic")
@@ -290,6 +322,12 @@ func (s *server) Serve() run.StopNotify {
func (s *server) GracefulStop() {
s.log.Info().Msg("stopping")
+
+ // Stop TLS reloader if enabled
+ if s.tlsReloader != nil {
+ s.tlsReloader.Stop()
+ }
+
stopped := make(chan struct{})
s.clientCloser()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
diff --git a/docs/operation/security.md b/docs/operation/security.md
index 748e8dd4..59ad6c7e 100644
--- a/docs/operation/security.md
+++ b/docs/operation/security.md
@@ -123,19 +123,33 @@ BanyanDB supports enabling TLS for the internal gRPC
queue between liaison and d
The following flags are used to configure internal TLS:
-- `--internal-tls`: Enable TLS on the internal queue client inside Liaison; if
false, the queue uses plain TCP.
-- `--internal-ca-cert <path>`: PEM‑encoded CA (or bundle) that the queue
client uses to verify Data‑Node server certificates.
+- `--data-client-tls`: Enable TLS on the internal queue client inside Liaison;
if false, the queue uses plain TCP.
+- `--data-client-ca-cert`: PEM‑encoded CA (or bundle) that the queue client
uses to verify Data‑Node server certificates.
Each Liaison/Data process still advertises its certificate with the public
flags (`--tls`, `--cert-file`, `--key-file`). The same certificate/key pair can
be reused for both external traffic and the internal queue.
**Example: Enable internal TLS between liaison and data nodes**
```shell
-banyand liaison --internal-tls=true --internal-ca-cert=ca.crt --tls=true
--cert-file=server.crt --key-file=server.key
+banyand liaison --data-client-tls=true --data-client-ca-cert=ca.crt --tls=true
--cert-file=server.crt --key-file=server.key
banyand data --tls=true --cert-file=server.crt --key-file=server.key
```
-> Note: The `--internal-ca-cert` should point to the CA certificate used to
sign the data node's server certificate.
+> Note:
+> - The `--data-client-ca-cert` should point to the CA certificate used to
sign the data node's server certificate.
+> - Data nodes act as servers and do not need a CA certificate to connect to
liaison nodes (liaison nodes connect to data nodes, not vice versa).
+> - The flag names use the prefix "data" because liaison nodes connect to data
nodes. The actual flag names are `--data-client-tls` and
`--data-client-ca-cert`.
+
+**Dynamic Certificate Reloading**
+
+All certificates used for internal TLS can be reloaded automatically when they
are updated:
+
+- **Liaison nodes**:
+ - CA certificate file (`--data-client-ca-cert`): Can be updated, and the
server will automatically reload it and reconnect all clients to data nodes
with the new certificate.
+ - Server certificate files (`--cert-file`, `--key-file`): Can be updated,
and the server will automatically reload them. These certificates are used for
both external client connections and can be reused for internal queue
communication.
+- **Data nodes**: The server certificate files (`--cert-file`, `--key-file`)
can be updated, and the server will automatically reload them without requiring
a restart.
+
+You can update the files or recreate the files, and the servers will
automatically reload them.
## Authorization