hanahmily commented on code in PR #642:
URL:
https://github.com/apache/skywalking-banyandb/pull/642#discussion_r2048320207
##########
banyand/liaison/http/server.go:
##########
@@ -134,23 +146,148 @@ func (p *server) GetPort() *uint32 {
func (p *server) PreRun(_ context.Context) error {
p.l = logger.GetLogger(p.Name())
+ p.l.Info().Str("level", p.l.GetLevel().String()).Msg("Logger
initialized")
+
+ // Log flag values after parsing
+ p.l.Debug().Bool("tls", p.tls).Str("certFile",
p.certFile).Str("keyFile", p.keyFile).Msg("Flag values after parsing")
+
+ // Initialize TLSReloader if TLS is enabled
+ p.l.Debug().Bool("tls", p.tls).Msg("HTTP TLS flag is set")
+ if p.tls {
+ p.l.Debug().Str("certFile", p.certFile).Str("keyFile",
p.keyFile).Msg("Initializing TLSReloader for HTTP")
+ var err error
+ p.tlsReloader, err = pkgtls.NewReloader(p.certFile, p.keyFile,
p.l)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to initialize
TLSReloader for HTTP")
+ return err
+ }
+ } else {
+ p.l.Warn().Msg("HTTP TLS is disabled, skipping TLSReloader
initialization")
+ }
+
+ // Initialize gRPC client with cert file
+ if p.grpcCert != "" {
Review Comment:
Could you use the reloader for reloading? The reloader can act as either a
server or a client. If only credentials are passed to the reloader, it can
generate the client-side TLS configuration.
##########
banyand/liaison/http/server.go:
##########
@@ -134,23 +146,148 @@ func (p *server) GetPort() *uint32 {
func (p *server) PreRun(_ context.Context) error {
p.l = logger.GetLogger(p.Name())
+ p.l.Info().Str("level", p.l.GetLevel().String()).Msg("Logger
initialized")
+
+ // Log flag values after parsing
+ p.l.Debug().Bool("tls", p.tls).Str("certFile",
p.certFile).Str("keyFile", p.keyFile).Msg("Flag values after parsing")
+
+ // Initialize TLSReloader if TLS is enabled
+ p.l.Debug().Bool("tls", p.tls).Msg("HTTP TLS flag is set")
+ if p.tls {
+ p.l.Debug().Str("certFile", p.certFile).Str("keyFile",
p.keyFile).Msg("Initializing TLSReloader for HTTP")
+ var err error
+ p.tlsReloader, err = pkgtls.NewReloader(p.certFile, p.keyFile,
p.l)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to initialize
TLSReloader for HTTP")
+ return err
+ }
+ } else {
+ p.l.Warn().Msg("HTTP TLS is disabled, skipping TLSReloader
initialization")
+ }
+
+ // Initialize gRPC client with cert file
+ if p.grpcCert != "" {
+ p.l.Debug().Str("grpcCert", p.grpcCert).Msg("Initializing TLS
credentials for gRPC connection")
+
+ // Load the client cert directly - for client certs we don't
need a reloader
+ // as we only need the public cert to verify the server's
identity
+ cert, err := os.ReadFile(p.grpcCert)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to read gRPC cert
file")
+ return err
+ }
+
+ certPool := x509.NewCertPool()
+ if !certPool.AppendCertsFromPEM(cert) {
+ p.l.Error().Msg("Failed to append gRPC cert to pool")
+ return errors.New("failed to append gRPC cert to pool")
+ }
+
+ // Extract hostname from grpcAddr
+ host, _, err := net.SplitHostPort(p.grpcAddr)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to split gRPC address")
+ return err
+ }
+ if host == "" || host == "0.0.0.0" || host == "[::]" {
+ host = "localhost"
+ }
+
+ p.creds = credentials.NewTLS(&tls.Config{
Review Comment:
The new credentials are not applied to the client because they are created
statically.
Your test passed, but it cannot verify that the credentials are updated. The
gRPC connection between the HTTP server and the gRPC server is long-lived. Even
if the server's credentials are updated, the connection does not close and
recreate itself, which is why your test does not fail.
##########
test/integration/standalone/other/tls_test.go:
##########
@@ -74,4 +81,203 @@ var _ = g.Describe("Query service_cpm_minute", func() {
}, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
}, flags.EventuallyTimeout).Should(gm.Succeed())
})
+
+ g.It("queries an updated TLS server", func() {
+ // Create a temporary directory for certificate files
+ tempDir, err := os.MkdirTemp("", "tls-test-*")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer os.RemoveAll(tempDir)
+
+ // Copy the original certificate and key to the temporary
directory
+ tempCertFile := filepath.Join(tempDir, "cert.pem")
+ tempKeyFile := filepath.Join(tempDir, "key.pem")
+
+ // Read original certificate and key
+ originalCert, err := os.ReadFile(certFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ originalKey, err := os.ReadFile(keyFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Write to temporary location
+ err = os.WriteFile(tempCertFile, originalCert, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, originalKey, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Start a new server using the temporary certificate files
+ tempAddr, _, tempDeferFn :=
setup.StandaloneWithTLS(tempCertFile, tempKeyFile)
+ defer tempDeferFn()
+
+ // Create initial connection with the original certificate
+ creds, err := credentials.NewClientTLSFromFile(tempCertFile,
"localhost")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ tempConn, err := grpchelper.Conn(tempAddr, 10*time.Second,
grpclib.WithTransportCredentials(creds))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer tempConn.Close()
+
+ // Populate test data and verify with original certificate
connection first
+ ns := timestamp.NowMilli().UnixNano()
+ testBaseTime := time.Unix(0, ns-ns%int64(time.Minute))
+ casesMeasureData.Write(tempConn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", testBaseTime, interval)
+
+ // Verify using the initial connection before updating
certificates
+ gm.Eventually(func(innerGm gm.Gomega) {
+ casesMeasureData.VerifyFn(innerGm,
helpers.SharedContext{
+ Connection: tempConn,
+ BaseTime: testBaseTime,
+ }, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ // Generate a new certificate with a different CommonName
+ certPEM, keyPEM, err :=
tls.GenerateSelfSignedCert("updated-localhost", []string{"localhost"})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Update the certificate files in the temporary location
+ err = os.WriteFile(tempCertFile, certPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, keyPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Wait for the server to reload the certificates
+ time.Sleep(1 * time.Second)
+
+ // Create a new connection with the updated certificates
+ newCreds, err := credentials.NewClientTLSFromFile(tempCertFile,
"updated-localhost")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ newConn, err := grpchelper.Conn(tempAddr, 10*time.Second,
grpclib.WithTransportCredentials(newCreds))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer newConn.Close()
+
+ // Verify using the connection with new certificates
+ gm.Eventually(func(innerGm gm.Gomega) {
+ casesMeasureData.VerifyFn(innerGm,
helpers.SharedContext{
+ Connection: newConn,
+ BaseTime: testBaseTime,
+ }, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+ })
+
+ g.It("queries an updated HTTP server", func() {
Review Comment:
You can replace this test with the previous one since it verifies both HTTP
and gRPC.
##########
banyand/liaison/http/server.go:
##########
@@ -134,23 +146,148 @@ func (p *server) GetPort() *uint32 {
func (p *server) PreRun(_ context.Context) error {
p.l = logger.GetLogger(p.Name())
+ p.l.Info().Str("level", p.l.GetLevel().String()).Msg("Logger
initialized")
+
+ // Log flag values after parsing
+ p.l.Debug().Bool("tls", p.tls).Str("certFile",
p.certFile).Str("keyFile", p.keyFile).Msg("Flag values after parsing")
+
+ // Initialize TLSReloader if TLS is enabled
+ p.l.Debug().Bool("tls", p.tls).Msg("HTTP TLS flag is set")
+ if p.tls {
+ p.l.Debug().Str("certFile", p.certFile).Str("keyFile",
p.keyFile).Msg("Initializing TLSReloader for HTTP")
+ var err error
+ p.tlsReloader, err = pkgtls.NewReloader(p.certFile, p.keyFile,
p.l)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to initialize
TLSReloader for HTTP")
+ return err
+ }
+ } else {
+ p.l.Warn().Msg("HTTP TLS is disabled, skipping TLSReloader
initialization")
+ }
+
+ // Initialize gRPC client with cert file
+ if p.grpcCert != "" {
+ p.l.Debug().Str("grpcCert", p.grpcCert).Msg("Initializing TLS
credentials for gRPC connection")
+
+ // Load the client cert directly - for client certs we don't
need a reloader
+ // as we only need the public cert to verify the server's
identity
+ cert, err := os.ReadFile(p.grpcCert)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to read gRPC cert
file")
+ return err
+ }
+
+ certPool := x509.NewCertPool()
+ if !certPool.AppendCertsFromPEM(cert) {
+ p.l.Error().Msg("Failed to append gRPC cert to pool")
+ return errors.New("failed to append gRPC cert to pool")
+ }
+
+ // Extract hostname from grpcAddr
+ host, _, err := net.SplitHostPort(p.grpcAddr)
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to split gRPC address")
+ return err
+ }
+ if host == "" || host == "0.0.0.0" || host == "[::]" {
+ host = "localhost"
+ }
+
+ p.creds = credentials.NewTLS(&tls.Config{
+ RootCAs: certPool,
+ ServerName: host,
+ MinVersion: tls.VersionTLS12,
+ })
+
+ // Set up a file watcher to monitor certificate changes
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ p.l.Error().Err(err).Msg("Failed to create watcher for
gRPC cert")
+ return err
+ }
+
+ if err = watcher.Add(p.grpcCert); err != nil {
+ p.l.Error().Err(err).Msg("Failed to watch gRPC cert
file")
+ watcher.Close()
+ return err
+ }
+
+ p.certWatcher = watcher
+
+ // Start a goroutine to watch for certificate changes
+ go func() {
+ for {
+ select {
+ case event, ok := <-p.certWatcher.Events:
+ if !ok {
+ return
+ }
+ if event.Has(fsnotify.Write) ||
event.Has(fsnotify.Create) {
Review Comment:
The "Remove" event is ignored. This is another reason to use the reloader,
which already supports handling this event.
##########
test/integration/standalone/other/tls_test.go:
##########
@@ -74,4 +81,203 @@ var _ = g.Describe("Query service_cpm_minute", func() {
}, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
}, flags.EventuallyTimeout).Should(gm.Succeed())
})
+
+ g.It("queries an updated TLS server", func() {
+ // Create a temporary directory for certificate files
+ tempDir, err := os.MkdirTemp("", "tls-test-*")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer os.RemoveAll(tempDir)
+
+ // Copy the original certificate and key to the temporary
directory
+ tempCertFile := filepath.Join(tempDir, "cert.pem")
+ tempKeyFile := filepath.Join(tempDir, "key.pem")
+
+ // Read original certificate and key
+ originalCert, err := os.ReadFile(certFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ originalKey, err := os.ReadFile(keyFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Write to temporary location
+ err = os.WriteFile(tempCertFile, originalCert, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, originalKey, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Start a new server using the temporary certificate files
+ tempAddr, _, tempDeferFn :=
setup.StandaloneWithTLS(tempCertFile, tempKeyFile)
+ defer tempDeferFn()
+
+ // Create initial connection with the original certificate
+ creds, err := credentials.NewClientTLSFromFile(tempCertFile,
"localhost")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ tempConn, err := grpchelper.Conn(tempAddr, 10*time.Second,
grpclib.WithTransportCredentials(creds))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer tempConn.Close()
+
+ // Populate test data and verify with original certificate
connection first
+ ns := timestamp.NowMilli().UnixNano()
+ testBaseTime := time.Unix(0, ns-ns%int64(time.Minute))
+ casesMeasureData.Write(tempConn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", testBaseTime, interval)
+
+ // Verify using the initial connection before updating
certificates
+ gm.Eventually(func(innerGm gm.Gomega) {
+ casesMeasureData.VerifyFn(innerGm,
helpers.SharedContext{
+ Connection: tempConn,
+ BaseTime: testBaseTime,
+ }, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ // Generate a new certificate with a different CommonName
+ certPEM, keyPEM, err :=
tls.GenerateSelfSignedCert("updated-localhost", []string{"localhost"})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Update the certificate files in the temporary location
+ err = os.WriteFile(tempCertFile, certPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, keyPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Wait for the server to reload the certificates
+ time.Sleep(1 * time.Second)
+
+ // Create a new connection with the updated certificates
+ newCreds, err := credentials.NewClientTLSFromFile(tempCertFile,
"updated-localhost")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ newConn, err := grpchelper.Conn(tempAddr, 10*time.Second,
grpclib.WithTransportCredentials(newCreds))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer newConn.Close()
+
+ // Verify using the connection with new certificates
+ gm.Eventually(func(innerGm gm.Gomega) {
+ casesMeasureData.VerifyFn(innerGm,
helpers.SharedContext{
+ Connection: newConn,
+ BaseTime: testBaseTime,
+ }, helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute})
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+ })
+
+ g.It("queries an updated HTTP server", func() {
+ // Create a temporary directory for certificate files
+ tempDir, err := os.MkdirTemp("", "http-tls-test-*")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer os.RemoveAll(tempDir)
+
+ // Copy the original certificate and key to the temporary
directory
+ tempCertFile := filepath.Join(tempDir, "cert.pem")
+ tempKeyFile := filepath.Join(tempDir, "key.pem")
+
+ // Read original certificate and key
+ originalCert, err := os.ReadFile(certFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ originalKey, err := os.ReadFile(keyFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Write to temporary location
+ err = os.WriteFile(tempCertFile, originalCert, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, originalKey, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Start a new server using the temporary certificate files
+ grpcAddr, httpAddr, tempDeferFn :=
setup.StandaloneWithTLS(tempCertFile, tempKeyFile)
+ defer tempDeferFn()
+
+ // Create a secure HTTP client with the initial certificates
+ httpClient := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &cryptotls.Config{
+ InsecureSkipVerify: false,
+ RootCAs: x509.NewCertPool(),
+ MinVersion:
cryptotls.VersionTLS12,
+ },
+ },
+ }
+ // Add the cert to the pool
+ certPool :=
httpClient.Transport.(*http.Transport).TLSClientConfig.RootCAs
+ certPool.AppendCertsFromPEM(originalCert)
+
+ // Verify HTTP server is working with original certificates
+ healthCheckURL := fmt.Sprintf("https://%s/api/healthz",
httpAddr)
+ gm.Eventually(func() error {
+ var respErr error
+ resp, respErr := httpClient.Get(healthCheckURL)
+ if respErr != nil {
+ return respErr
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("unexpected status code: %d",
resp.StatusCode)
+ }
+ return nil
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ // Create a gRPC connection to populate test data
+ creds, err := credentials.NewClientTLSFromFile(tempCertFile,
"localhost")
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ grpcConn, err := grpchelper.Conn(grpcAddr, 10*time.Second,
grpclib.WithTransportCredentials(creds))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer grpcConn.Close()
+
+ // Populate test data
+ ns := timestamp.NowMilli().UnixNano()
+ testBaseTime := time.Unix(0, ns-ns%int64(time.Minute))
+ casesMeasureData.Write(grpcConn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", testBaseTime, interval)
+
+ // Generate a new certificate with a different CommonName
+ certPEM, keyPEM, err :=
tls.GenerateSelfSignedCert("updated-localhost", []string{"localhost"})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Update the certificate files in the temporary location
+ err = os.WriteFile(tempCertFile, certPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(tempKeyFile, keyPEM, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // Wait for the server to reload the certificates
+ time.Sleep(1 * time.Second)
+
+ // Create a new HTTP client with updated certificates
+ newHTTPClient := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &cryptotls.Config{
+ InsecureSkipVerify: false,
+ RootCAs: x509.NewCertPool(),
+ MinVersion:
cryptotls.VersionTLS12,
+ },
+ },
+ }
+ // Add the new cert to the pool
+ newCertPool :=
newHTTPClient.Transport.(*http.Transport).TLSClientConfig.RootCAs
+ newCertPool.AppendCertsFromPEM(certPEM)
+
+ // Verify HTTP server works with the new certificates
+ gm.Eventually(func() error {
+ var respErr2 error
+ resp, respErr2 := newHTTPClient.Get(healthCheckURL)
Review Comment:
Since the health check is a long-lived connection, please use another
endpoint
https://github.com/apache/skywalking-banyandb/blob/main/api/proto/banyandb/database/v1/rpc.proto#L104
to verify the HTTP server.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]