AlexStocks commented on code in PR #3235:
URL: https://github.com/apache/dubbo-go/pull/3235#discussion_r2969087027


##########
graceful_shutdown/closing_ack.go:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graceful_shutdown
+
+import (
+       "strings"
+       "sync"
+       "sync/atomic"
+)
+
+// ClosingAckStats is a lightweight, consumer-side statistic for active 
closing notices.
+// Received means the consumer observed an active notice; Removed means 
directory removal
+// succeeded; Missed means the notice was observed but did not match a local 
directory entry.
+type ClosingAckStats struct {
+       Received uint64
+       Removed  uint64
+       Missed   uint64
+}
+
+type closingAckCounter struct {
+       received atomic.Uint64
+       removed  atomic.Uint64
+       missed   atomic.Uint64
+}
+
+type closingAckTracker struct {
+       counters sync.Map // map[string]*closingAckCounter
+}
+
+var defaultClosingAckTracker = &closingAckTracker{}
+
+func isActiveClosingSource(source string) bool {
+       return strings.HasSuffix(source, "-health-watch")
+}
+
+func (t *closingAckTracker) record(event ClosingEvent, removed bool) {
+       if !isActiveClosingSource(event.Source) {
+               return
+       }
+
+       counter := t.getOrCreateCounter(event.Source)
+       counter.received.Add(1)
+       if removed {
+               counter.removed.Add(1)
+               return
+       }
+       counter.missed.Add(1)
+}
+
+func (t *closingAckTracker) getOrCreateCounter(source string) 
*closingAckCounter {
+       if value, ok := t.counters.Load(source); ok {
+               return value.(*closingAckCounter)
+       }
+       counter := &closingAckCounter{}
+       actual, _ := t.counters.LoadOrStore(source, counter)
+       return actual.(*closingAckCounter)
+}
+
+func (t *closingAckTracker) snapshot() map[string]ClosingAckStats {
+       stats := make(map[string]ClosingAckStats)
+       t.counters.Range(func(key, value any) bool {
+               counter := value.(*closingAckCounter)
+               stats[key.(string)] = ClosingAckStats{
+                       Received: counter.received.Load(),
+                       Removed:  counter.removed.Load(),
+                       Missed:   counter.missed.Load(),
+               }
+               return true
+       })
+       return stats
+}
+
+func (t *closingAckTracker) reset() {
+       t.counters = sync.Map{}
+}
+
+// DefaultClosingAckStats returns the process-wide active-notice ack 
statistics.
+func DefaultClosingAckStats() map[string]ClosingAckStats {

Review Comment:
   ## closing_ack.go:93 - reset() 存在竞态条件
   
   `t.counters = sync.Map{}` 直接赋值替换不是原子操作。如果其他 goroutine 同时调用 record() 或 
snapshot(),可能导致数据不一致。
   
   **建议**: 使用 sync.Mutex 保护 reset() 操作,或者提供一个带锁的 Clear() 方法。



##########
graceful_shutdown/shutdown.go:
##########
@@ -125,31 +140,125 @@ func totalTimeout(shutdown *global.ShutdownConfig) 
time.Duration {
 }
 
 func beforeShutdown(shutdown *global.ShutdownConfig) {
-       destroyRegistries()
+       // 1. mark closing state
+       logger.Info("Graceful shutdown --- Mark closing state.")
+       shutdown.Closing.Store(true)
+
+       // 2. unregister services from registries
+       unregisterRegistries()
+
+       // 3. notify long connection consumers
+       notifyLongConnectionConsumers(shutdown)
+
+       // 4. wait and accept new requests
        // waiting for a short time so that the clients have enough time to get 
the notification that server shutdowns
        // The value of configuration depends on how long the clients will get 
notification.
        waitAndAcceptNewRequests(shutdown)
 
+       // 5. reject new requests and wait for in-flight requests
        // reject sending/receiving the new request but keeping waiting for 
accepting requests
        waitForSendingAndReceivingRequests(shutdown)
 
-       // destroy all protocols
+       // 6. destroy protocols
        destroyProtocols()
 
-       logger.Info("Graceful shutdown --- Execute the custom callbacks.")
-       customCallbacks := extension.GetAllCustomShutdownCallbacks()
-       for callback := customCallbacks.Front(); callback != nil; callback = 
callback.Next() {
-               callback.Value.(func())()
-       }
+       // 7. execute custom callbacks
+       executeCustomShutdownCallbacks(shutdown)
 }
 
-// destroyRegistries destroys RegistryProtocol directly.
-func destroyRegistries() {
-       logger.Info("Graceful shutdown --- Destroy all registriesConfig. ")
-       registryProtocol := extension.GetProtocol(constant.RegistryProtocol)
+// unregisterRegistries unregisters exported services from registries during 
graceful shutdown.
+// If the registry protocol does not expose a narrower unregister capability, 
it falls back to Destroy.
+func unregisterRegistries() {
+       logger.Info("Graceful shutdown --- Unregister exported services from 
registries.")
+       registryProtocol, ok := getProtocolSafely(constant.RegistryProtocol)
+       if !ok {
+               logger.Warnf("Graceful shutdown --- Registry protocol %s is not 
registered, skip unregistering registries.", constant.RegistryProtocol)
+               return
+       }
+
+       if unregisterer, ok := 
registryProtocol.(protocolbase.RegistryUnregisterer); ok {
+               unregisterer.UnregisterRegistries()
+               return
+       }
+
+       logger.Warnf("Graceful shutdown --- Registry protocol %s does not 
support unregister-only shutdown, falling back to Destroy().", 
constant.RegistryProtocol)
        registryProtocol.Destroy()
 }
 
+// notifyLongConnectionConsumers notifies all connected consumers via long 
connections
+func notifyLongConnectionConsumers(shutdown *global.ShutdownConfig) {
+       logger.Info("Graceful shutdown --- Notify long connection consumers.")
+
+       notifyTimeout := parseDuration(shutdown.NotifyTimeout, 
notifyTimeoutDesc, defaultNotifyTimeout)
+       callbacks := extension.GracefulShutdownCallbacks()
+       var wg sync.WaitGroup
+       for name, callback := range callbacks {
+               wg.Add(1)
+               go func(name string, callback 
extension.GracefulShutdownCallback) {
+                       defer wg.Done()
+                       ctx, cancel := 
context.WithTimeout(context.Background(), notifyTimeout)
+                       defer cancel()
+                       notifyWithRetry(ctx, name, callback)
+               }(name, callback)
+       }
+       wg.Wait()
+}
+
+// notifyWithRetry notifies with exponential backoff retry
+func notifyWithRetry(ctx context.Context, name string, callback 
extension.GracefulShutdownCallback) {
+       backOff := backoff.NewExponentialBackOff()
+       backOff.InitialInterval = defaultRetryBaseDelay
+       backOff.MaxInterval = defaultRetryMaxDelay
+       backOff.MaxElapsedTime = 0
+
+       var attempts int
+       operation := func() error {
+               attempts++
+               err := invokeGracefulShutdownCallback(ctx, name, callback)
+               if err == nil {
+                       logger.Infof("Graceful shutdown --- Notify %s 
completed", name)
+                       return nil
+               }
+
+               logger.Warnf("Graceful shutdown --- Notify %s attempt %d failed 
--- %v", name, attempts, err)
+               return err
+       }
+
+       notify := func(err error, delay time.Duration) {
+               logger.Infof("Graceful shutdown --- Notify %s retrying in %v 
(attempt %d/%d)", name, delay, attempts, defaultMaxRetries)
+       }
+
+       retryPolicy := backoff.WithContext(backoff.WithMaxRetries(backOff, 
uint64(defaultMaxRetries)), ctx)
+       if err := backoff.RetryNotify(operation, retryPolicy, notify); err != 
nil {
+               if ctx.Err() != nil {
+                       logger.Warnf("Graceful shutdown --- Notify %s timeout 
after %d attempts, continuing...", name, attempts)
+                       return
+               }
+
+               logger.Warnf("Graceful shutdown --- Notify %s failed after %d 
attempts --- %v", name, attempts, err)
+       }
+}
+
+func invokeGracefulShutdownCallback(ctx context.Context, name string, callback 
extension.GracefulShutdownCallback) error {
+       done := make(chan error, 1)
+       go func() {
+               defer func() {
+                       if recovered := recover(); recovered != nil {

Review Comment:
   ## shutdown.go:181-198 - invokeGracefulShutdownCallback panic recovery 可能导致死锁
   
   如果 callback(ctx) 执行时 panic,done 不会被发送,调用方会一直等待 ctx.Done()。虽然有 recover,但 
panic 会导致 channel 永远阻塞。
   
   **建议**: 在 recover 中确保 done 被发送,或者使用 defer 保证执行。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to