AlexStocks commented on code in PR #3235: URL: https://github.com/apache/dubbo-go/pull/3235#discussion_r2969087027
########## graceful_shutdown/closing_ack.go: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package graceful_shutdown + +import ( + "strings" + "sync" + "sync/atomic" +) + +// ClosingAckStats is a lightweight, consumer-side statistic for active closing notices. +// Received means the consumer observed an active notice; Removed means directory removal +// succeeded; Missed means the notice was observed but did not match a local directory entry. +type ClosingAckStats struct { + Received uint64 + Removed uint64 + Missed uint64 +} + +type closingAckCounter struct { + received atomic.Uint64 + removed atomic.Uint64 + missed atomic.Uint64 +} + +type closingAckTracker struct { + counters sync.Map // map[string]*closingAckCounter +} + +var defaultClosingAckTracker = &closingAckTracker{} + +func isActiveClosingSource(source string) bool { + return strings.HasSuffix(source, "-health-watch") +} + +func (t *closingAckTracker) record(event ClosingEvent, removed bool) { + if !isActiveClosingSource(event.Source) { + return + } + + counter := t.getOrCreateCounter(event.Source) + counter.received.Add(1) + if removed { + counter.removed.Add(1) + return + } + counter.missed.Add(1) +} + +func (t *closingAckTracker) getOrCreateCounter(source string) *closingAckCounter { + if value, ok := t.counters.Load(source); ok { + return value.(*closingAckCounter) + } + counter := &closingAckCounter{} + actual, _ := t.counters.LoadOrStore(source, counter) + return actual.(*closingAckCounter) +} + +func (t *closingAckTracker) snapshot() map[string]ClosingAckStats { + stats := make(map[string]ClosingAckStats) + t.counters.Range(func(key, value any) bool { + counter := value.(*closingAckCounter) + stats[key.(string)] = ClosingAckStats{ + Received: counter.received.Load(), + Removed: counter.removed.Load(), + Missed: counter.missed.Load(), + } + return true + }) + return stats +} + +func (t *closingAckTracker) reset() { + t.counters = sync.Map{} +} + +// DefaultClosingAckStats returns the process-wide active-notice ack statistics. +func DefaultClosingAckStats() map[string]ClosingAckStats { Review Comment: ## closing_ack.go:93 - reset() 存在竞态条件 `t.counters = sync.Map{}` 直接赋值替换不是原子操作。如果其他 goroutine 同时调用 record() 或 snapshot(),可能导致数据不一致。 **建议**: 使用 sync.Mutex 保护 reset() 操作,或者提供一个带锁的 Clear() 方法。 ########## graceful_shutdown/shutdown.go: ########## @@ -125,31 +140,125 @@ func totalTimeout(shutdown *global.ShutdownConfig) time.Duration { } func beforeShutdown(shutdown *global.ShutdownConfig) { - destroyRegistries() + // 1. mark closing state + logger.Info("Graceful shutdown --- Mark closing state.") + shutdown.Closing.Store(true) + + // 2. unregister services from registries + unregisterRegistries() + + // 3. notify long connection consumers + notifyLongConnectionConsumers(shutdown) + + // 4. wait and accept new requests // waiting for a short time so that the clients have enough time to get the notification that server shutdowns // The value of configuration depends on how long the clients will get notification. waitAndAcceptNewRequests(shutdown) + // 5. reject new requests and wait for in-flight requests // reject sending/receiving the new request but keeping waiting for accepting requests waitForSendingAndReceivingRequests(shutdown) - // destroy all protocols + // 6. destroy protocols destroyProtocols() - logger.Info("Graceful shutdown --- Execute the custom callbacks.") - customCallbacks := extension.GetAllCustomShutdownCallbacks() - for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() { - callback.Value.(func())() - } + // 7. execute custom callbacks + executeCustomShutdownCallbacks(shutdown) } -// destroyRegistries destroys RegistryProtocol directly. -func destroyRegistries() { - logger.Info("Graceful shutdown --- Destroy all registriesConfig. ") - registryProtocol := extension.GetProtocol(constant.RegistryProtocol) +// unregisterRegistries unregisters exported services from registries during graceful shutdown. +// If the registry protocol does not expose a narrower unregister capability, it falls back to Destroy. +func unregisterRegistries() { + logger.Info("Graceful shutdown --- Unregister exported services from registries.") + registryProtocol, ok := getProtocolSafely(constant.RegistryProtocol) + if !ok { + logger.Warnf("Graceful shutdown --- Registry protocol %s is not registered, skip unregistering registries.", constant.RegistryProtocol) + return + } + + if unregisterer, ok := registryProtocol.(protocolbase.RegistryUnregisterer); ok { + unregisterer.UnregisterRegistries() + return + } + + logger.Warnf("Graceful shutdown --- Registry protocol %s does not support unregister-only shutdown, falling back to Destroy().", constant.RegistryProtocol) registryProtocol.Destroy() } +// notifyLongConnectionConsumers notifies all connected consumers via long connections +func notifyLongConnectionConsumers(shutdown *global.ShutdownConfig) { + logger.Info("Graceful shutdown --- Notify long connection consumers.") + + notifyTimeout := parseDuration(shutdown.NotifyTimeout, notifyTimeoutDesc, defaultNotifyTimeout) + callbacks := extension.GracefulShutdownCallbacks() + var wg sync.WaitGroup + for name, callback := range callbacks { + wg.Add(1) + go func(name string, callback extension.GracefulShutdownCallback) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), notifyTimeout) + defer cancel() + notifyWithRetry(ctx, name, callback) + }(name, callback) + } + wg.Wait() +} + +// notifyWithRetry notifies with exponential backoff retry +func notifyWithRetry(ctx context.Context, name string, callback extension.GracefulShutdownCallback) { + backOff := backoff.NewExponentialBackOff() + backOff.InitialInterval = defaultRetryBaseDelay + backOff.MaxInterval = defaultRetryMaxDelay + backOff.MaxElapsedTime = 0 + + var attempts int + operation := func() error { + attempts++ + err := invokeGracefulShutdownCallback(ctx, name, callback) + if err == nil { + logger.Infof("Graceful shutdown --- Notify %s completed", name) + return nil + } + + logger.Warnf("Graceful shutdown --- Notify %s attempt %d failed --- %v", name, attempts, err) + return err + } + + notify := func(err error, delay time.Duration) { + logger.Infof("Graceful shutdown --- Notify %s retrying in %v (attempt %d/%d)", name, delay, attempts, defaultMaxRetries) + } + + retryPolicy := backoff.WithContext(backoff.WithMaxRetries(backOff, uint64(defaultMaxRetries)), ctx) + if err := backoff.RetryNotify(operation, retryPolicy, notify); err != nil { + if ctx.Err() != nil { + logger.Warnf("Graceful shutdown --- Notify %s timeout after %d attempts, continuing...", name, attempts) + return + } + + logger.Warnf("Graceful shutdown --- Notify %s failed after %d attempts --- %v", name, attempts, err) + } +} + +func invokeGracefulShutdownCallback(ctx context.Context, name string, callback extension.GracefulShutdownCallback) error { + done := make(chan error, 1) + go func() { + defer func() { + if recovered := recover(); recovered != nil { Review Comment: ## shutdown.go:181-198 - invokeGracefulShutdownCallback panic recovery 可能导致死锁 如果 callback(ctx) 执行时 panic,done 不会被发送,调用方会一直等待 ctx.Done()。虽然有 recover,但 panic 会导致 channel 永远阻塞。 **建议**: 在 recover 中确保 done 被发送,或者使用 defer 保证执行。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
