justxuewei commented on code in PR #2267:
URL: https://github.com/apache/dubbo-go/pull/2267#discussion_r1155984607
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
Review Comment:
```suggestion
func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener
remoting.DataListener) {
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
Review Comment:
```go
if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL)); err == nil {
ttl = timeout
} else {
logger.Warnf
}
##########
remoting/zookeeper/listener.go:
##########
@@ -279,7 +363,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL,
zkRootPath string, li
// Only need to compare Path when subscribing to
provider
if strings.LastIndex(zkRootPath,
constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
- if provider.ServiceKey() != conf.ServiceKey() {
+ if provider.Interface() != intf || conf.Group()
!= constant.AnyValue && conf.Group() != provider.Group() ||
+ conf.Version() != constant.AnyValue &&
conf.Version() != provider.Version() {
Review Comment:
Use `is_any_condition()` instead.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
Review Comment:
```suggestion
// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
```
##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event
remoting.Event) bool {
ConfigType: event.Action,
},
)
- return true
+ match = true
}
+
+ // AnyValue condition
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if intf != constant.AnyValue || group != constant.AnyValue &&
group != serviceURL.Group() ||
+ version != constant.AnyValue && version !=
serviceURL.Group() {
+ continue
Review Comment:
I think here you should wrap them into a method, named `is_any_condition()`,
and add some unit tests for it.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper
EventListener][listenDirEvent] Get children of path {%s} with watcher failed,
the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep
failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes *
ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper
EventListener][listenDirEvent] Can not gey any children for the path {%s},
please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath,
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator,
constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ _, ok := l.pathMap[zkRootPath]
+ if !ok {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
Review Comment:
Remove this.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper
EventListener][listenDirEvent] Get children of path {%s} with watcher failed,
the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep
failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes *
ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper
EventListener][listenDirEvent] Can not gey any children for the path {%s},
please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath,
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator,
constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
Review Comment:
```suggestion
l.pathMapLock.Lock()
defer l.pathMapLock.Unlock()
```
##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event
remoting.Event) bool {
ConfigType: event.Action,
},
)
- return true
+ match = true
}
+
+ // AnyValue condition
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if intf != constant.AnyValue || group != constant.AnyValue &&
group != serviceURL.Group() ||
+ version != constant.AnyValue && version !=
serviceURL.Group() {
+ continue
+ }
+ listener.Process(
+ &config_center.ConfigChangeEvent{
+ Key: event.Path,
+ Value: serviceURL.Clone(),
+ ConfigType: event.Action,
+ },
+ )
+ match = true
}
- return false
+ return match
Review Comment:
The `listener.Process()` is called twice here. However, it could be reduced
to call once.
```
if (match service key) || (match any condition) {
listener.Process()
match = true
}
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper
EventListener][listenDirEvent] Get children of path {%s} with watcher failed,
the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep
failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes *
ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper
EventListener][listenDirEvent] Can not gey any children for the path {%s},
please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath,
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator,
constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ _, ok := l.pathMap[zkRootPath]
+ if !ok {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
+ if ok {
+ logger.Warnf("[Zookeeper
EventListener][listenDirEvent] The child with zk path {%s} has already been
listened.", zkRootPath)
+ continue
+ }
Review Comment:
```go
if _, ok := l.pathMap[zkRootPath]; ok {
logger.Warnf
} else {
l.pathMap[zkRootPath] = uatomic.NewInt32(0)
}
##########
common/url.go:
##########
@@ -413,6 +413,31 @@ func ServiceKey(intf string, group string, version string)
string {
return buf.String()
}
+// ParseServiceKey get interface, group and version from servicekey
Review Comment:
```suggestion
// ParseServiceKey gets interface, group and version from service key.
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper
EventListener][listenDirEvent] Get children of path {%s} with watcher failed,
the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep
failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes *
ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper
EventListener][listenDirEvent] Can not gey any children for the path {%s},
please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
Review Comment:
```suggestion
// Build the child path
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action:
remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string,
listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener
remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err :=
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey,
constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Wrong configuration for registry.ttl, error=%+v, using default value %v
instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper
EventListener][listenDirEvent] Get children of path {%s} with watcher failed,
the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep
failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes *
ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper
EventListener][listenDirEvent] Can not gey any children for the path {%s},
please check if the provider does ready.", rootPath)
Review Comment:
```suggestion
logger.Warnf("[Zookeeper EventListener][listenDirEvent]
Can not get any child from the path \"%s\", please check if the provider does
ready.", rootPath)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]