justxuewei commented on code in PR #2267:
URL: https://github.com/apache/dubbo-go/pull/2267#discussion_r1155984607


##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {

Review Comment:
   ```suggestion
   func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener 
remoting.DataListener) {
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }

Review Comment:
   ```go
   if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL)); err == nil {
       ttl = timeout
   } else {
       logger.Warnf
   }



##########
remoting/zookeeper/listener.go:
##########
@@ -279,7 +363,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, 
zkRootPath string, li
                        // Only need to compare Path when subscribing to 
provider
                        if strings.LastIndex(zkRootPath, 
constant.ProviderCategory) != -1 {
                                provider, _ := common.NewURL(c)
-                               if provider.ServiceKey() != conf.ServiceKey() {
+                               if provider.Interface() != intf || conf.Group() 
!= constant.AnyValue && conf.Group() != provider.Group() ||
+                                       conf.Version() != constant.AnyValue && 
conf.Version() != provider.Version() {

Review Comment:
   Use `is_any_condition()` instead.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"

Review Comment:
   ```suggestion
   // listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
   ```



##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event 
remoting.Event) bool {
                                        ConfigType: event.Action,
                                },
                        )
-                       return true
+                       match = true
                }
+
+               // AnyValue condition
+               intf, group, version := common.ParseServiceKey(serviceKey)
+               if intf != constant.AnyValue || group != constant.AnyValue && 
group != serviceURL.Group() ||
+                       version != constant.AnyValue && version != 
serviceURL.Group() {
+                       continue

Review Comment:
   I think here you should wrap them into a method, named `is_any_condition()`, 
and add some unit tests for it.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] Can not gey any children for the path {%s}, 
please check if the provider does ready.", rootPath)
+               }
+               for _, c := range children {
+                       // Build the children path
+                       zkRootPath := path.Join(rootPath, 
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, 
constant.ProvidersCategory)
+                       // Save the path to avoid listen repeatedly
+                       l.pathMapLock.Lock()
+                       _, ok := l.pathMap[zkRootPath]
+                       if !ok {
+                               l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+                       }
+                       l.pathMapLock.Unlock()

Review Comment:
   Remove this.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] Can not gey any children for the path {%s}, 
please check if the provider does ready.", rootPath)
+               }
+               for _, c := range children {
+                       // Build the children path
+                       zkRootPath := path.Join(rootPath, 
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, 
constant.ProvidersCategory)
+                       // Save the path to avoid listen repeatedly
+                       l.pathMapLock.Lock()

Review Comment:
   ```suggestion
                        l.pathMapLock.Lock()
                        defer l.pathMapLock.Unlock()
   ```



##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event 
remoting.Event) bool {
                                        ConfigType: event.Action,
                                },
                        )
-                       return true
+                       match = true
                }
+
+               // AnyValue condition
+               intf, group, version := common.ParseServiceKey(serviceKey)
+               if intf != constant.AnyValue || group != constant.AnyValue && 
group != serviceURL.Group() ||
+                       version != constant.AnyValue && version != 
serviceURL.Group() {
+                       continue
+               }
+               listener.Process(
+                       &config_center.ConfigChangeEvent{
+                               Key:        event.Path,
+                               Value:      serviceURL.Clone(),
+                               ConfigType: event.Action,
+                       },
+               )
+               match = true
        }
-       return false
+       return match

Review Comment:
   The `listener.Process()` is called twice here. However, it could be reduced 
to call once.
   
   ```
   if (match service key) || (match any condition) {
       listener.Process()
       match = true
   }
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] Can not gey any children for the path {%s}, 
please check if the provider does ready.", rootPath)
+               }
+               for _, c := range children {
+                       // Build the children path
+                       zkRootPath := path.Join(rootPath, 
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, 
constant.ProvidersCategory)
+                       // Save the path to avoid listen repeatedly
+                       l.pathMapLock.Lock()
+                       _, ok := l.pathMap[zkRootPath]
+                       if !ok {
+                               l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+                       }
+                       l.pathMapLock.Unlock()
+                       if ok {
+                               logger.Warnf("[Zookeeper 
EventListener][listenDirEvent] The child with zk path {%s} has already been 
listened.", zkRootPath)
+                               continue
+                       }

Review Comment:
   ```go
   if _, ok := l.pathMap[zkRootPath]; ok {
       logger.Warnf
   } else {
       l.pathMap[zkRootPath] = uatomic.NewInt32(0)
   }



##########
common/url.go:
##########
@@ -413,6 +413,31 @@ func ServiceKey(intf string, group string, version string) 
string {
        return buf.String()
 }
 
+// ParseServiceKey get interface, group and version from servicekey

Review Comment:
   ```suggestion
   // ParseServiceKey gets interface, group and version from service key.
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] Can not gey any children for the path {%s}, 
please check if the provider does ready.", rootPath)
+               }
+               for _, c := range children {
+                       // Build the children path

Review Comment:
   ```suggestion
                        // Build the child path
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL))
+               if err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] Can not gey any children for the path {%s}, 
please check if the provider does ready.", rootPath)

Review Comment:
   ```suggestion
                        logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Can not get any child from the path \"%s\", please check if the provider does 
ready.", rootPath)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to