Alanxtl commented on code in PR #3235:
URL: https://github.com/apache/dubbo-go/pull/3235#discussion_r2902930215


##########
protocol/base/base_invoker.go:
##########
@@ -89,6 +89,11 @@ func (bi *BaseInvoker) IsAvailable() bool {
        return bi.available.Load()
 }
 
+// SetAvailable sets available flag
+func (bi *BaseInvoker) SetAvailable(available bool) {
+       bi.available.Store(available)
+}

Review Comment:
   这个没必要封装一个函数,之前都是直接store的



##########
filter/graceful_shutdown/consumer_filter.go:
##########
@@ -91,3 +112,89 @@ func (f *consumerGracefulShutdownFilter) Set(name string, 
conf any) {
                // do nothing
        }
 }
+
+// isClosingInvoker checks if invoker is in closing list
+func (f *consumerGracefulShutdownFilter) isClosingInvoker(invoker 
base.Invoker) bool {
+       key := invoker.GetURL().String()
+       if expireTime, ok := f.closingInvokers.Load(key); ok {
+               if time.Now().Before(expireTime.(time.Time)) {
+                       return true
+               }
+               f.closingInvokers.Delete(key)
+       }
+       return false
+}
+
+// isClosingResponse checks if response contains closing flag
+func (f *consumerGracefulShutdownFilter) isClosingResponse(result 
result.Result) bool {
+       if result != nil && result.Attachments() != nil {
+               if v, ok := 
result.Attachments()[constant.GracefulShutdownClosingKey]; ok {
+                       if v == "true" {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+// markClosingInvoker marks invoker as closing and sets available=false
+func (f *consumerGracefulShutdownFilter) markClosingInvoker(invoker 
base.Invoker) {
+       key := invoker.GetURL().String()
+       expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
+       f.closingInvokers.Store(key, expireTime)
+
+       logger.Infof("Graceful shutdown: marked invoker as closing: %s, will 
expire at %v, IsAvailable=%v",
+               key, expireTime, invoker.IsAvailable())
+
+       if bi, ok := invoker.(*base.BaseInvoker); ok {
+               bi.SetAvailable(false)
+               logger.Infof("Graceful shutdown: set invoker unavailable: %s, 
IsAvailable now=%v",
+                       key, invoker.IsAvailable())
+       }
+}
+
+func (f *consumerGracefulShutdownFilter) getClosingInvokerExpireTime() 
time.Duration {
+       if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime 
!= "" {
+               if duration, err := 
time.ParseDuration(f.shutdownConfig.ClosingInvokerExpireTime); err == nil && 
duration > 0 {
+                       return duration
+               }
+       }
+       // default 30s, also try parsing numeric string as milliseconds
+       if f.shutdownConfig != nil && f.shutdownConfig.ClosingInvokerExpireTime 
!= "" {
+               if ms, err := 
strconv.ParseInt(f.shutdownConfig.ClosingInvokerExpireTime, 10, 64); err == nil 
&& ms > 0 {
+                       return time.Duration(ms) * time.Millisecond
+               }
+       }
+       return 30 * time.Second
+}
+
+// handleRequestError handles request errors and marks invoker as unavailable 
for connection errors
+func (f *consumerGracefulShutdownFilter) handleRequestError(invoker 
base.Invoker, err error) {
+       if err == nil {
+               return
+       }
+
+       // check for connection-related errors
+       errMsg := err.Error()
+       isConnectionError := strings.Contains(errMsg, "client has closed") ||
+               strings.Contains(errMsg, "connection") ||
+               strings.Contains(errMsg, "EOF") ||
+               strings.Contains(errMsg, "broken pipe") ||
+               strings.Contains(errMsg, "gRPC") && strings.Contains(errMsg, 
"closing") ||
+               strings.Contains(errMsg, "http2") && strings.Contains(errMsg, 
"close")
+
+       if isConnectionError {
+               key := invoker.GetURL().String()
+               expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
+               f.closingInvokers.Store(key, expireTime)
+
+               logger.Infof("Graceful shutdown: connection error detected for 
invoker: %s, marking as closing, will expire at %v, IsAvailable=%v",

Review Comment:
   其他的都改一下



##########
filter/graceful_shutdown/consumer_filter.go:
##########
@@ -91,3 +112,89 @@ func (f *consumerGracefulShutdownFilter) Set(name string, 
conf any) {
                // do nothing
        }
 }
+
+// isClosingInvoker checks if invoker is in closing list
+func (f *consumerGracefulShutdownFilter) isClosingInvoker(invoker 
base.Invoker) bool {
+       key := invoker.GetURL().String()
+       if expireTime, ok := f.closingInvokers.Load(key); ok {
+               if time.Now().Before(expireTime.(time.Time)) {
+                       return true
+               }
+               f.closingInvokers.Delete(key)
+       }
+       return false
+}
+
+// isClosingResponse checks if response contains closing flag
+func (f *consumerGracefulShutdownFilter) isClosingResponse(result 
result.Result) bool {
+       if result != nil && result.Attachments() != nil {
+               if v, ok := 
result.Attachments()[constant.GracefulShutdownClosingKey]; ok {
+                       if v == "true" {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+// markClosingInvoker marks invoker as closing and sets available=false
+func (f *consumerGracefulShutdownFilter) markClosingInvoker(invoker 
base.Invoker) {
+       key := invoker.GetURL().String()
+       expireTime := time.Now().Add(f.getClosingInvokerExpireTime())
+       f.closingInvokers.Store(key, expireTime)
+
+       logger.Infof("Graceful shutdown: marked invoker as closing: %s, will 
expire at %v, IsAvailable=%v",
+               key, expireTime, invoker.IsAvailable())
+
+       if bi, ok := invoker.(*base.BaseInvoker); ok {
+               bi.SetAvailable(false)
+               logger.Infof("Graceful shutdown: set invoker unavailable: %s, 
IsAvailable now=%v",

Review Comment:
   ```suggestion
                logger.Infof("Graceful shutdown --- Set invoker unavailable: 
%s, IsAvailable now=%v",
   ```



##########
common/extension/graceful_shutdown.go:
##########
@@ -19,35 +19,41 @@ package extension
 
 import (
        "container/list"
+       "context"
 )
 
-var customShutdownCallbacks = list.New()
-
-/**
- * AddCustomShutdownCallback
- * you should not make any assumption about the order.
- * For example, if you have more than one callbacks, and you wish the order is:
- * callback1()
- * callback2()
- * ...
- * callbackN()
- * Then you should put then together:
- * func callback() {
- *     callback1()
- *     callback2()
- *     ...
- *     callbackN()
- * }
- * I think the order of custom callbacks should be decided by the users.
- * Even though I can design a mechanism to support the ordered custom 
callbacks,
- * the benefit of that mechanism is low.
- * And it may introduce much complication for another users.
- */

Review Comment:
   这个注释也别删



##########
graceful_shutdown/shutdown.go:
##########
@@ -42,6 +43,10 @@ const (
        defaultStepTimeout                 = 3 * time.Second
        defaultConsumerUpdateWaitTime      = 3 * time.Second
        defaultOfflineRequestWindowTimeout = 3 * time.Second
+       // retry config

Review Comment:
   ```suggestion
   
        // retry config
   ```



##########
common/extension/graceful_shutdown.go:
##########
@@ -19,35 +19,41 @@ package extension
 
 import (
        "container/list"
+       "context"
 )
 
-var customShutdownCallbacks = list.New()
-
-/**
- * AddCustomShutdownCallback
- * you should not make any assumption about the order.
- * For example, if you have more than one callbacks, and you wish the order is:
- * callback1()
- * callback2()
- * ...
- * callbackN()
- * Then you should put then together:
- * func callback() {
- *     callback1()
- *     callback2()
- *     ...
- *     callbackN()
- * }
- * I think the order of custom callbacks should be decided by the users.
- * Even though I can design a mechanism to support the ordered custom 
callbacks,
- * the benefit of that mechanism is low.
- * And it may introduce much complication for another users.
- */
+// GracefulShutdownCallback is the callback for graceful shutdown
+// name: protocol name such as "grpc", "tri", "dubbo"
+// returns error if notify failed
+type GracefulShutdownCallback func(ctx context.Context) error
+
+var (
+       customShutdownCallbacks   = list.New()
+       gracefulShutdownCallbacks = make(map[string]GracefulShutdownCallback)
+)
+
+// AddCustomShutdownCallback adds custom shutdown callback
 func AddCustomShutdownCallback(callback func()) {
        customShutdownCallbacks.PushBack(callback)
 }
 
-// GetAllCustomShutdownCallbacks gets all custom shutdown callbacks
+// GetAllCustomShutdownCallbacks returns all custom shutdown callbacks
 func GetAllCustomShutdownCallbacks() *list.List {
        return customShutdownCallbacks
 }
+
+// SetGracefulShutdownCallback sets protocol-level graceful shutdown callback
+func SetGracefulShutdownCallback(name string, f GracefulShutdownCallback) {
+       gracefulShutdownCallbacks[name] = f
+}
+
+// GetGracefulShutdownCallback returns protocol's graceful shutdown callback
+func GetGracefulShutdownCallback(name string) (GracefulShutdownCallback, bool) 
{
+       f, ok := gracefulShutdownCallbacks[name]
+       return f, ok
+}
+
+// GetAllGracefulShutdownCallbacks returns all protocol's graceful shutdown 
callbacks
+func GetAllGracefulShutdownCallbacks() map[string]GracefulShutdownCallback {
+       return gracefulShutdownCallbacks
+}

Review Comment:
   1. getter和setter风格的命名不是go的习惯,可以考虑改成
   ```go
   func RegisterGracefulShutdownCallback(name string, f 
GracefulShutdownCallback)
   func LookupGracefulShutdownCallback(name string) (GracefulShutdownCallback, 
bool)
   func GracefulShutdownCallbacks() map[string]GracefulShutdownCallback
   ```
   
   2. 并发访问问题需要再考虑一下
   3. 这个SetGracefulShutdownCallback是否允许重复注册,如:
   ```go
   extension.SetGracefulShutdownCallback(GRPC, cb1)
   extension.SetGracefulShutdownCallback(GRPC, cb2)
   ```
   如果不允许需要加一下判断
   
   4. 不要直接把内部 map 返回出去,如果需要的话可以返回一个拷贝



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to