This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 248b052a Fix the data race (#673)
248b052a is described below

commit 248b052a319eeefa880e6edd1c307ffaf2760a34
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue May 20 15:45:26 2025 +0800

    Fix the data race (#673)
---
 banyand/liaison/http/server.go | 48 +++++++++++++++++++++++++++++-------------
 1 file changed, 33 insertions(+), 15 deletions(-)

diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 8f23fc77..a4e5fd05 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -25,6 +25,7 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync"
        "sync/atomic"
        "time"
 
@@ -71,22 +72,23 @@ type Server interface {
 
 type server struct {
        creds           credentials.TransportCredentials
-       tlsReloader     *pkgtls.Reloader
-       grpcTLSReloader *pkgtls.Reloader
-       l               *logger.Logger
-       handlerWrapper  *atomicHandler
+       grpcCtx         context.Context
        srv             *http.Server
+       grpcCancel      context.CancelFunc
+       handlerWrapper  *atomicHandler
+       grpcTLSReloader *pkgtls.Reloader
        stopCh          chan struct{}
        gwMux           *runtime.ServeMux
        grpcClient      atomic.Pointer[healthcheck.Client]
-       grpcCtx         context.Context
-       grpcCancel      context.CancelFunc
+       l               *logger.Logger
+       tlsReloader     *pkgtls.Reloader
        host            string
        listenAddr      string
        grpcAddr        string
        keyFile         string
        certFile        string
        grpcCert        string
+       grpcMu          sync.Mutex
        port            uint32
        tls             bool
 }
@@ -190,17 +192,19 @@ func (p *server) PreRun(_ context.Context) error {
                                        func() {
                                                p.l.Info().Msg("Processing 
certificate update after debounce")
 
-                                               // Cancel existing gRPC 
connections
+                                               // Safely handle the context 
and cancel function
+                                               p.grpcMu.Lock()
                                                prevGRPCCancel := p.grpcCancel
-                                               if prevGRPCCancel != nil {
-                                                       defer func() {
-                                                               
p.l.Info().Msg("Canceling existing gRPC connections")
-                                                               prevGRPCCancel()
-                                                       }()
-                                               }
 
                                                // Create a new context for the 
new connections
                                                p.grpcCtx, p.grpcCancel = 
context.WithCancel(context.Background())
+                                               p.grpcMu.Unlock()
+
+                                               // Cancel existing gRPC 
connections
+                                               if prevGRPCCancel != nil {
+                                                       
p.l.Info().Msg("Canceling existing gRPC connections")
+                                                       prevGRPCCancel()
+                                               }
 
                                                // Force a short delay to 
ensure all resources are properly cleaned up
                                                time.Sleep(200 * 
time.Millisecond)
@@ -244,7 +248,9 @@ func (p *server) PreRun(_ context.Context) error {
 }
 
 func (p *server) Serve() run.StopNotify {
+       p.grpcMu.Lock()
        p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background())
+       p.grpcMu.Unlock()
 
        // Start TLS reloader for HTTP server
        if p.tls && p.tlsReloader != nil {
@@ -321,7 +327,11 @@ func (p *server) initGRPCClient() error {
        }
 
        // Create health check client
-       client, err := healthcheck.NewClient(p.grpcCtx, p.l, p.grpcAddr, opts)
+       p.grpcMu.Lock()
+       ctx := p.grpcCtx
+       p.grpcMu.Unlock()
+
+       client, err := healthcheck.NewClient(ctx, p.l, p.grpcAddr, opts)
        if err != nil {
                return errors.Wrap(err, "failed to create health check client")
        }
@@ -372,9 +382,17 @@ func (p *server) GracefulStop() {
        if p.grpcTLSReloader != nil {
                p.grpcTLSReloader.Stop()
        }
+
+       p.grpcMu.Lock()
+       var cancel context.CancelFunc
        if p.grpcCancel != nil {
-               p.grpcCancel()
+               cancel = p.grpcCancel
        }
+       p.grpcMu.Unlock()
+       if cancel != nil {
+               cancel()
+       }
+
        if err := p.srv.Close(); err != nil {
                p.l.Error().Err(err)
        }

Reply via email to