This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch race in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 163c696d6253c9906f6a7e503a3671fa92c3bfc5 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue May 20 15:21:13 2025 +0800 Fix the data race Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/liaison/http/server.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 8f23fc77..3c95fb20 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -25,6 +25,7 @@ import ( "net/http" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -79,6 +80,7 @@ type server struct { stopCh chan struct{} gwMux *runtime.ServeMux grpcClient atomic.Pointer[healthcheck.Client] + grpcMu sync.Mutex grpcCtx context.Context grpcCancel context.CancelFunc host string @@ -190,17 +192,19 @@ func (p *server) PreRun(_ context.Context) error { func() { p.l.Info().Msg("Processing certificate update after debounce") - // Cancel existing gRPC connections + // Safely handle the context and cancel function + p.grpcMu.Lock() prevGRPCCancel := p.grpcCancel - if prevGRPCCancel != nil { - defer func() { - p.l.Info().Msg("Canceling existing gRPC connections") - prevGRPCCancel() - }() - } // Create a new context for the new connections p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background()) + p.grpcMu.Unlock() + + // Cancel existing gRPC connections + if prevGRPCCancel != nil { + p.l.Info().Msg("Canceling existing gRPC connections") + prevGRPCCancel() + } // Force a short delay to ensure all resources are properly cleaned up time.Sleep(200 * time.Millisecond) @@ -244,7 +248,9 @@ func (p *server) PreRun(_ context.Context) error { } func (p *server) Serve() run.StopNotify { + p.grpcMu.Lock() p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background()) + p.grpcMu.Unlock() // Start TLS reloader for HTTP server if p.tls && p.tlsReloader != nil { @@ -321,7 +327,11 @@ func (p *server) initGRPCClient() error { } // Create health check client - client, err := healthcheck.NewClient(p.grpcCtx, p.l, p.grpcAddr, opts) + p.grpcMu.Lock() + ctx := p.grpcCtx + p.grpcMu.Unlock() + + client, err := healthcheck.NewClient(ctx, p.l, p.grpcAddr, opts) if err != nil { return errors.Wrap(err, "failed to create health check client") } @@ -372,9 +382,13 @@ func (p *server) GracefulStop() { if p.grpcTLSReloader != nil { p.grpcTLSReloader.Stop() } - if p.grpcCancel != nil { - p.grpcCancel() + + p.grpcMu.Lock() + if cancel := p.grpcCancel; cancel != nil { + cancel() } + p.grpcMu.Unlock() + if err := p.srv.Close(); err != nil { p.l.Error().Err(err) }