AlexStocks commented on a change in pull request #1287:
URL: https://github.com/apache/dubbo-go/pull/1287#discussion_r661138258
##########
File path: protocol/rpc_status.go
##########
@@ -194,84 +187,190 @@ func CleanAllStatus() {
return true
}
serviceStatistic.Range(delete2)
- delete3 := func(key, _ interface{}) bool {
- invokerBlackList.Delete(key)
+ delete3 := func(_, value interface{}) bool {
+ if v, ok := value.(*ServiceHealthState); ok {
+ v.blackList.Range(func(key, value interface{}) bool {
+ v.blackList.Delete(key)
+ return true
+ })
+ }
return true
}
- invokerBlackList.Range(delete3)
+ serviceStateMap.Range(delete3)
+}
+
+// if the ip is changed in kubernetes, then the ip will not exist. So we
should recycle the invoker.
+// there are two ways :
+// 1. we should know when the invoker is dropped and clear the data from
blacklist
+// 2. we add a counter to collect the retry times. After 512 times(just now)
retry, we should clear it.
+type invokerState struct {
+ invoker Invoker
+ retryTimes int32
+}
+
+func newInvokeState(invoker Invoker) *invokerState {
+ return &invokerState{
+ invoker: invoker,
+ }
+}
+
+func (s *invokerState) increateRetryTimes() {
+ s.retryTimes++
}
// GetInvokerHealthyStatus get invoker's conn healthy status
func GetInvokerHealthyStatus(invoker Invoker) bool {
- _, found := invokerBlackList.Load(invoker.GetURL().Key())
- return !found
+ if v, ok := serviceStateMap.Load(invoker.GetURL().ServiceKey()); ok {
+ if state, ok := v.(*ServiceHealthState); ok {
+ _, found := state.blackList.Load(invoker.GetURL().Key())
+ return !found
+ }
+ }
+ return true
+}
+
+// nolint
+func GetAndRefreshState(url *common.URL) bool {
+ if v, ok := serviceStateMap.Load(url.ServiceKey()); ok {
+ if state, ok := v.(*ServiceHealthState); ok {
+ return atomic.CompareAndSwapInt32(state.rebuildRoute,
1, 0)
+ }
+ }
+ return false
+}
+
+type ServiceHealthState struct {
+ serviceKey string
+ //if some process in refresh
+ refreshState *int32
+ refresh atomic.Value
+ rebuildRoute *int32
+ blackList sync.Map // store unhealthy url blackList
+}
+
+func NewServiceState(serviceKey string) *ServiceHealthState {
+ if v, ok := serviceStateMap.Load(serviceKey); ok {
+ return v.(*ServiceHealthState)
+ }
+ serviceState := &ServiceHealthState{
+ refreshState: new(int32),
+ rebuildRoute: new(int32),
+ serviceKey: serviceKey,
+ }
+ serviceStateMap.Store(serviceKey, serviceState)
+ return serviceState
+}
+
+func (s *ServiceHealthState) reset() {
+ s.refresh.Store(false)
+ atomic.StoreInt32(s.rebuildRoute, 0)
+ s.blackList.Range(func(key, value interface{}) bool {
+ s.blackList.Delete(key)
+ return true
+ })
+}
+
+func (s *ServiceHealthState) configNeedRefresh(needRefresh bool) {
+ s.refresh.Store(needRefresh)
+}
+
+func (s *ServiceHealthState) needRefresh() bool {
+ v := s.refresh.Load()
+ if v == nil {
+ return false
+ }
+ return v.(bool)
}
// SetInvokerUnhealthyStatus add target invoker to black list
-func SetInvokerUnhealthyStatus(invoker Invoker) {
- invokerBlackList.Store(invoker.GetURL().Key(), invoker)
- logger.Info("Add invoker ip = ", invoker.GetURL().Location, " to black
list")
- blackListCacheDirty.Store(true)
+func (s *ServiceHealthState) SetInvokerUnhealthyStatus(invoker Invoker) {
+ s.configNeedRefresh(true)
+ s.blackList.Store(invoker.GetURL().Key(), newInvokeState(invoker))
+ logger.Infof("Add invoker ip=%s to black list for service(%s)",
invoker.GetURL().Location, invoker.GetURL().ServiceKey())
+ s.activeBlackListCacheDirty()
}
// RemoveInvokerUnhealthyStatus remove unhealthy status of target invoker from
blacklist
-func RemoveInvokerUnhealthyStatus(invoker Invoker) {
- invokerBlackList.Delete(invoker.GetURL().Key())
- logger.Info("Remove invoker ip = ", invoker.GetURL().Location, " from
black list")
- blackListCacheDirty.Store(true)
+func (s *ServiceHealthState) RemoveInvokerUnhealthyStatus(invoker Invoker) {
+ s.blackList.Delete(invoker.GetURL().Key())
+ logger.Infof("Remove invoker ip(%s) from black list for service(%s)",
invoker.GetURL().Location, invoker.GetURL().ServiceKey())
+ s.activeBlackListCacheDirty()
}
// GetBlackListInvokers get at most size of blockSize invokers from black list
-func GetBlackListInvokers(blockSize int) []Invoker {
- resultIvks := make([]Invoker, 0, 16)
- invokerBlackList.Range(func(k, v interface{}) bool {
- resultIvks = append(resultIvks, v.(Invoker))
+func (s *ServiceHealthState) GetBlackListInvokers(blockSize int)
[]*invokerState {
+ resultIvks := make([]*invokerState, 0, blockSize)
+ s.blackList.Range(func(k, v interface{}) bool {
+ if v == nil {
+ return true
+ }
+ resultIvks = append(resultIvks, v.(*invokerState))
+ if len(resultIvks) == blockSize {
+ return false
+ }
return true
})
- if blockSize > len(resultIvks) {
- return resultIvks
- }
- return resultIvks[:blockSize]
+ return resultIvks
}
// RemoveUrlKeyUnhealthyStatus called when event of provider unregister,
delete from black list
-func RemoveUrlKeyUnhealthyStatus(key string) {
- invokerBlackList.Delete(key)
+func (s *ServiceHealthState) RemoveUrlKeyUnhealthyStatus(key string) {
+ if _, ok := s.blackList.Load(key); ok {
+ s.blackList.Delete(key)
+ s.activeBlackListCacheDirty()
+ }
logger.Info("Remove invoker key = ", key, " from black list")
- blackListCacheDirty.Store(true)
}
-func GetAndRefreshState() bool {
- state := blackListCacheDirty.Load()
- blackListCacheDirty.Store(false)
- return state
+func (s *ServiceHealthState) activeBlackListCacheDirty() {
+ atomic.StoreInt32(s.rebuildRoute, 1)
}
// TryRefreshBlackList start 3 gr to check at most block=16 invokers in black
list
// if target invoker is available, then remove it from black list
-func TryRefreshBlackList() {
- if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) {
- go func() {
- wg := sync.WaitGroup{}
- defer func() {
-
atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0)
- }()
-
- ivks :=
GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
- logger.Debug("blackList len = ", len(ivks))
-
- for i := 0; i < 3; i++ {
- wg.Add(1)
- go func(ivks []Invoker, i int) {
- defer wg.Done()
- for j, _ := range ivks {
- if j%3-i == 0 &&
ivks[j].(Invoker).IsAvailable() {
-
RemoveInvokerUnhealthyStatus(ivks[i])
- }
- }
- }(ivks, i)
- }
- wg.Wait()
+func (s *ServiceHealthState) TryRefreshBlackList() {
+ if s.needRefresh() {
+ go s.refreshBlackList()
+ }
+}
+
+func (s *ServiceHealthState) refreshBlackList() {
+ defer func() {
+ if r := recover(); r != nil {
+ logger.Errorf("try to refresh black list failed: %s,
%+v", s.serviceKey, r)
+ }
+ }()
+
+ if atomic.CompareAndSwapInt32(s.refreshState, 0, 1) {
+ defer func() {
+ atomic.CompareAndSwapInt32(s.refreshState, 1, 0)
}()
+
+ ivkStates :=
s.GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
+ logger.Debug("blackList len = ", len(ivkStates))
+ if len(ivkStates) == 0 {
+ logger.Infof("there is no data in black list(%s), and
will not refresh black list.", s.serviceKey)
+ s.configNeedRefresh(false)
+ return
+ }
+ wg := sync.WaitGroup{}
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func(ivks []*invokerState, i int) {
+ defer wg.Done()
+ for j, _ := range ivks {
Review comment:
for j := range ivks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]