This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch race
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 163c696d6253c9906f6a7e503a3671fa92c3bfc5
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue May 20 15:21:13 2025 +0800

    Fix the data race
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/liaison/http/server.go | 34 ++++++++++++++++++++++++----------
 1 file changed, 24 insertions(+), 10 deletions(-)

diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 8f23fc77..3c95fb20 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -25,6 +25,7 @@ import (
        "net/http"
        "strconv"
        "strings"
+       "sync"
        "sync/atomic"
        "time"
 
@@ -79,6 +80,7 @@ type server struct {
        stopCh          chan struct{}
        gwMux           *runtime.ServeMux
        grpcClient      atomic.Pointer[healthcheck.Client]
+       grpcMu          sync.Mutex
        grpcCtx         context.Context
        grpcCancel      context.CancelFunc
        host            string
@@ -190,17 +192,19 @@ func (p *server) PreRun(_ context.Context) error {
                                        func() {
                                                p.l.Info().Msg("Processing 
certificate update after debounce")
 
-                                               // Cancel existing gRPC 
connections
+                                               // Safely handle the context 
and cancel function
+                                               p.grpcMu.Lock()
                                                prevGRPCCancel := p.grpcCancel
-                                               if prevGRPCCancel != nil {
-                                                       defer func() {
-                                                               
p.l.Info().Msg("Canceling existing gRPC connections")
-                                                               prevGRPCCancel()
-                                                       }()
-                                               }
 
                                                // Create a new context for the 
new connections
                                                p.grpcCtx, p.grpcCancel = 
context.WithCancel(context.Background())
+                                               p.grpcMu.Unlock()
+
+                                               // Cancel existing gRPC 
connections
+                                               if prevGRPCCancel != nil {
+                                                       
p.l.Info().Msg("Canceling existing gRPC connections")
+                                                       prevGRPCCancel()
+                                               }
 
                                                // Force a short delay to 
ensure all resources are properly cleaned up
                                                time.Sleep(200 * 
time.Millisecond)
@@ -244,7 +248,9 @@ func (p *server) PreRun(_ context.Context) error {
 }
 
 func (p *server) Serve() run.StopNotify {
+       p.grpcMu.Lock()
        p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background())
+       p.grpcMu.Unlock()
 
        // Start TLS reloader for HTTP server
        if p.tls && p.tlsReloader != nil {
@@ -321,7 +327,11 @@ func (p *server) initGRPCClient() error {
        }
 
        // Create health check client
-       client, err := healthcheck.NewClient(p.grpcCtx, p.l, p.grpcAddr, opts)
+       p.grpcMu.Lock()
+       ctx := p.grpcCtx
+       p.grpcMu.Unlock()
+
+       client, err := healthcheck.NewClient(ctx, p.l, p.grpcAddr, opts)
        if err != nil {
                return errors.Wrap(err, "failed to create health check client")
        }
@@ -372,9 +382,13 @@ func (p *server) GracefulStop() {
        if p.grpcTLSReloader != nil {
                p.grpcTLSReloader.Stop()
        }
-       if p.grpcCancel != nil {
-               p.grpcCancel()
+
+       p.grpcMu.Lock()
+       if cancel := p.grpcCancel; cancel != nil {
+               cancel()
        }
+       p.grpcMu.Unlock()
+
        if err := p.srv.Close(); err != nil {
                p.l.Error().Err(err)
        }

Reply via email to