This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 248b052a Fix the data race (#673) 248b052a is described below commit 248b052a319eeefa880e6edd1c307ffaf2760a34 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue May 20 15:45:26 2025 +0800 Fix the data race (#673) --- banyand/liaison/http/server.go | 48 +++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 8f23fc77..a4e5fd05 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -25,6 +25,7 @@ import ( "net/http" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -71,22 +72,23 @@ type Server interface { type server struct { creds credentials.TransportCredentials - tlsReloader *pkgtls.Reloader - grpcTLSReloader *pkgtls.Reloader - l *logger.Logger - handlerWrapper *atomicHandler + grpcCtx context.Context srv *http.Server + grpcCancel context.CancelFunc + handlerWrapper *atomicHandler + grpcTLSReloader *pkgtls.Reloader stopCh chan struct{} gwMux *runtime.ServeMux grpcClient atomic.Pointer[healthcheck.Client] - grpcCtx context.Context - grpcCancel context.CancelFunc + l *logger.Logger + tlsReloader *pkgtls.Reloader host string listenAddr string grpcAddr string keyFile string certFile string grpcCert string + grpcMu sync.Mutex port uint32 tls bool } @@ -190,17 +192,19 @@ func (p *server) PreRun(_ context.Context) error { func() { p.l.Info().Msg("Processing certificate update after debounce") - // Cancel existing gRPC connections + // Safely handle the context and cancel function + p.grpcMu.Lock() prevGRPCCancel := p.grpcCancel - if prevGRPCCancel != nil { - defer func() { - p.l.Info().Msg("Canceling existing gRPC connections") - prevGRPCCancel() - }() - } // Create a new context for the new connections p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background()) + p.grpcMu.Unlock() + + // Cancel existing gRPC connections + if prevGRPCCancel != nil { + p.l.Info().Msg("Canceling existing gRPC connections") + prevGRPCCancel() + } // Force a short delay to ensure all resources are properly cleaned up time.Sleep(200 * time.Millisecond) @@ -244,7 +248,9 @@ func (p *server) PreRun(_ context.Context) error { } func (p *server) Serve() run.StopNotify { + p.grpcMu.Lock() p.grpcCtx, p.grpcCancel = context.WithCancel(context.Background()) + p.grpcMu.Unlock() // Start TLS reloader for HTTP server if p.tls && p.tlsReloader != nil { @@ -321,7 +327,11 @@ func (p *server) initGRPCClient() error { } // Create health check client - client, err := healthcheck.NewClient(p.grpcCtx, p.l, p.grpcAddr, opts) + p.grpcMu.Lock() + ctx := p.grpcCtx + p.grpcMu.Unlock() + + client, err := healthcheck.NewClient(ctx, p.l, p.grpcAddr, opts) if err != nil { return errors.Wrap(err, "failed to create health check client") } @@ -372,9 +382,17 @@ func (p *server) GracefulStop() { if p.grpcTLSReloader != nil { p.grpcTLSReloader.Stop() } + + p.grpcMu.Lock() + var cancel context.CancelFunc if p.grpcCancel != nil { - p.grpcCancel() + cancel = p.grpcCancel } + p.grpcMu.Unlock() + if cancel != nil { + cancel() + } + if err := p.srv.Close(); err != nil { p.l.Error().Err(err) }