AlexStocks commented on code in PR #3235:
URL: https://github.com/apache/dubbo-go/pull/3235#discussion_r2902465650
##########
graceful_shutdown/shutdown.go:
##########
@@ -150,6 +167,71 @@ func destroyRegistries() {
registryProtocol.Destroy()
}
+// notifyLongConnectionConsumers notifies all connected consumers via long
connections
+func notifyLongConnectionConsumers() {
+ logger.Info("Graceful shutdown --- Notify long connection consumers.")
+
+ notifyTimeout := 3 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), notifyTimeout)
+ defer cancel()
+
+ callbacks := extension.GetAllGracefulShutdownCallbacks()
+
+ for name, callback := range callbacks {
+ notifyWithRetry(ctx, name, callback)
+ }
+}
+
+// notifyWithRetry notifies with exponential backoff retry
+func notifyWithRetry(ctx context.Context, name string, callback
extension.GracefulShutdownCallback) {
+ maxRetries := defaultMaxRetries
+ baseDelay := defaultRetryBaseDelay
+ maxDelay := defaultRetryMaxDelay
+
+ var lastErr error
+ for attempt := 0; attempt <= maxRetries; attempt++ {
+ // check timeout
+ select {
+ case <-ctx.Done():
+ logger.Warnf("Graceful shutdown --- Notify %s timeout
after %d attempts, continuing...", name, attempt)
+ return
+ default:
+ }
+
+ err := callback(ctx)
+ if err == nil {
+ logger.Infof("Graceful shutdown --- Notify %s
completed", name)
+ return
+ }
+
+ lastErr = err
+ logger.Warnf("Graceful shutdown --- Notify %s attempt %d
failed: %v", name, attempt+1, err)
+
+ // retry with exponential backoff
+ if attempt < maxRetries {
+ delay := baseDelay
+ for i := 0; i < attempt; i++ {
+ delay *= 2
Review Comment:
三次重试的总等待时间:500ms + 1s + 2s = 3.5s,超过了外层 context timeout 3s,实际最多只能完成 1-2
次重试,配置自相矛盾。
另外 `go.mod` 里本就有 `github.com/cenkalti/backoff/v4` 依赖,不需要手写指数退避逻辑,直接用更可靠。
建议:要么把 `notifyTimeout` 调大(至少 10s),要么减少重试次数/延迟;退避逻辑改用 `cenkalti/backoff/v4`。
##########
filter/graceful_shutdown/consumer_filter.go:
##########
@@ -91,3 +112,89 @@ func (f *consumerGracefulShutdownFilter) Set(name string,
conf any) {
// do nothing
}
}
+
+// isClosingInvoker checks if invoker is in closing list
+func (f *consumerGracefulShutdownFilter) isClosingInvoker(invoker
base.Invoker) bool {
+ key := invoker.GetURL().String()
+ if expireTime, ok := f.closingInvokers.Load(key); ok {
+ if time.Now().Before(expireTime.(time.Time)) {
+ return true
+ }
+ f.closingInvokers.Delete(key)
+ }
+ return false
+}
+
+// isClosingResponse checks if response contains closing flag
+func (f *consumerGracefulShutdownFilter) isClosingResponse(result
result.Result) bool {
+ if result != nil && result.Attachments() != nil {
+ if v, ok :=
result.Attachments()[constant.GracefulShutdownClosingKey]; ok {
+ if v == "true" {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// markClosingInvoker marks invoker as closing and sets available=false
+func (f *consumerGracefulShutdownFilter) markClosingInvoker(invoker
base.Invoker) {
+ key := invoker.GetURL().String()
+ expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
+ f.closingInvokers.Store(key, expireTime)
+
+ logger.Infof("Graceful shutdown: marked invoker as closing: %s, will
expire at %v, IsAvailable=%v",
+ key, expireTime, invoker.IsAvailable())
+
+ if bi, ok := invoker.(*base.BaseInvoker); ok {
+ bi.SetAvailable(false)
+ logger.Infof("Graceful shutdown: set invoker unavailable: %s,
IsAvailable now=%v",
+ key, invoker.IsAvailable())
+ }
+}
+
+func (f *consumerGracefulShutdownFilter) getClosingInvokerExpireTime()
time.Duration {
+ if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime
!= "" {
+ if duration, err :=
time.ParseDuration(f.shutdownConfig.ClosingInvokerExpireTime); err == nil &&
duration > 0 {
+ return duration
+ }
+ }
+ // default 30s, also try parsing numeric string as milliseconds
+ if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime
!= "" {
+ if ms, err :=
strconv.ParseInt(f.shutdownConfig.ClosingInvokerExpireTime, 10, 64); err == nil
&& ms > 0 {
+ return time.Duration(ms) * time.Millisecond
+ }
+ }
+ return 30 * time.Second
+}
+
+// handleRequestError handles request errors and marks invoker as unavailable
for connection errors
+func (f *consumerGracefulShutdownFilter) handleRequestError(invoker
base.Invoker, err error) {
+ if err == nil {
+ return
+ }
+
+ // check for connection-related errors
+ errMsg := err.Error()
+ isConnectionError := strings.Contains(errMsg, "client has closed") ||
+ strings.Contains(errMsg, "connection") ||
Review Comment:
`strings.Contains(errMsg, "connection")` 过于宽泛,任何包含 "connection" 字样的业务错误(如
"database connection pool exhausted"、"connection refused by business
logic")都会被误判为连接关闭错误,将正常 invoker 标记为 closing 并 `SetAvailable(false)`,导致请求被永久拒绝直到
30s 过期。
建议:改用 `errors.Is` 对已知错误类型匹配,或缩窄字符串匹配为 `"connection reset"`、`"use of closed
network connection"` 等具体错误。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]