Copilot commented on code in PR #3387:
URL: https://github.com/apache/dubbo-go/pull/3387#discussion_r3384855709
##########
registry/etcdv3/registry.go:
##########
@@ -99,8 +104,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry,
error) {
// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client)
- r.configListener = NewConfigurationListener(r)
- r.dataListener = NewRegistryDataListener(r.configListener)
+ newDataListener := NewRegistryDataListener()
+ if r.dataListener != nil {
+ oldDataListener := r.dataListener
+ oldDataListener.mutex.Lock()
+ defer oldDataListener.mutex.Unlock()
+ oldDataListener.closed = true
+ for _, oldListener := range oldDataListener.subscribed {
+ etcdListener, ok := oldListener.(*configurationListener)
+ if !ok || etcdListener == nil ||
etcdListener.subscribeURL == nil {
+ continue
+ }
+ etcdListener.Close()
+ newListener := NewConfigurationListener(r,
etcdListener.subscribeURL)
+ newDataListener.SubscribeURL(etcdListener.subscribeURL,
newListener)
+ go listenServiceEvent(r.listener,
etcdProviderPath(etcdListener.subscribeURL), newDataListener)
+ }
+ }
+ r.dataListener = newDataListener
Review Comment:
In InitListeners(), newDataListener is mutated via SubscribeURL() while
ListenServiceEvent goroutines are started that may concurrently call
newDataListener.DataChange() (which reads l.subscribed under l.mutex). Because
SubscribeURL() itself does not take l.mutex, this can lead to concurrent map
read/write panics during recovery.
Lock newDataListener.mutex for the duration of the recovery population (or
otherwise ensure goroutines cannot call DataChange until after SubscribeURL()
calls are done).
##########
registry/etcdv3/listener.go:
##########
@@ -65,66 +84,108 @@ func (l *dataListener) DataChange(eventType
remoting.Event) bool {
return false
}
- for _, v := range l.interestedURL {
- if serviceURL.URLEqual(v) {
- l.listener.Process(
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ if l.closed {
+ return false
+ }
+ match := false
+ for serviceKey, listener := range l.subscribed {
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if serviceURL.ServiceKey() == serviceKey ||
common.IsAnyCondition(intf, group, version, serviceURL) {
+ listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
- Value: serviceURL,
+ Value: serviceURL.Clone(),
ConfigType: eventType.Action,
},
)
- return true
+ match = true
}
}
- return false
+ return match
+}
+
+// Close closes all subscribed configuration listeners.
+func (l *dataListener) Close() {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ l.closed = true
+ for _, listener := range l.subscribed {
+ listener.(*configurationListener).Close()
+ }
Review Comment:
dataListener.Close() also unconditionally type-asserts each subscribed
listener to *configurationListener. If any non-*configurationListener entry
exists (InitListeners already anticipates this possibility), Close() will panic.
Guard the type assertion (or close via an interface) to keep Close() safe
even when the map contains unexpected listener implementations.
##########
registry/etcdv3/listener.go:
##########
@@ -65,66 +84,108 @@ func (l *dataListener) DataChange(eventType
remoting.Event) bool {
return false
}
- for _, v := range l.interestedURL {
- if serviceURL.URLEqual(v) {
- l.listener.Process(
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ if l.closed {
+ return false
+ }
+ match := false
+ for serviceKey, listener := range l.subscribed {
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if serviceURL.ServiceKey() == serviceKey ||
common.IsAnyCondition(intf, group, version, serviceURL) {
+ listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
- Value: serviceURL,
+ Value: serviceURL.Clone(),
ConfigType: eventType.Action,
},
)
- return true
+ match = true
}
}
- return false
+ return match
+}
+
+// Close closes all subscribed configuration listeners.
+func (l *dataListener) Close() {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+ l.closed = true
+ for _, listener := range l.subscribed {
+ listener.(*configurationListener).Close()
+ }
}
type configurationListener struct {
- registry *etcdV3Registry
- events *gxchan.UnboundedChan
- closeOnce sync.Once
+ registry *etcdV3Registry
+ events *gxchan.UnboundedChan
+ isClosed bool
+ close chan struct{}
+ closeOnce sync.Once
+ subscribeURL *common.URL
}
// NewConfigurationListener for listening the event of etcdv3.
-func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
+func NewConfigurationListener(reg *etcdV3Registry, conf *common.URL)
*configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
- return &configurationListener{registry: reg, events:
gxchan.NewUnboundedChan(32)}
+ return &configurationListener{
+ registry: reg,
+ events: gxchan.NewUnboundedChan(32),
+ close: make(chan struct{}),
+ subscribeURL: conf,
+ }
}
// Process data change event from config center of etcd
func (l *configurationListener) Process(configType
*config_center.ConfigChangeEvent) {
+ select {
+ case <-l.close:
+ return
+ default:
+ }
l.events.In() <- configType
}
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
+ case <-l.close:
+ return nil, perrors.New("listener has been closed")
+
case <-l.registry.Done():
logger.Warn("[Registry][Etcdv3] listener's etcd client
connection is broken, so etcd event listener exit now")
return nil, perrors.New("listener stopped")
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Infof("[Registry][Etcdv3] got etcd event %#v", e)
- if e.ConfigType == remoting.EventTypeDel &&
l.registry.client.Valid() {
- select {
- case <-l.registry.Done():
- logger.Warnf("[Registry][Etcdv3] update
@result{%s}. But its connection to registry is invalid", e.Value)
- default:
- }
+ if l.shouldIgnoreDeleteEvent(e) {
continue
}
return ®istry.ServiceEvent{Action: e.ConfigType,
Service: e.Value.(*common.URL)}, nil
}
}
}
+func (l *configurationListener) shouldIgnoreDeleteEvent(e
*config_center.ConfigChangeEvent) bool {
+ if e.ConfigType != remoting.EventTypeDel || l.registry.client == nil ||
!validEtcdClient(l.registry.client) {
+ return false
+ }
+ select {
Review Comment:
shouldIgnoreDeleteEvent() currently returns true (i.e., ignores the event)
whenever the event is a delete *and* the etcd client is valid. This effectively
discards all delete events under healthy connections, which is likely inverted
logic (zookeeper ignores delete events when the client/connection is invalid).
If the intent is to ignore spurious delete events during
disconnect/reconnect, the condition should be based on an invalid client (or
connection state), and the associated tests should be updated accordingly.
##########
registry/etcdv3/listener.go:
##########
@@ -37,18 +37,37 @@ import (
)
type dataListener struct {
- interestedURL []*common.URL
- listener config_center.ConfigurationListener
+ subscribed map[string]config_center.ConfigurationListener
+ mutex sync.Mutex
+ closed bool
}
// NewRegistryDataListener creates a data listener for etcd
-func NewRegistryDataListener(listener config_center.ConfigurationListener)
*dataListener {
- return &dataListener{listener: listener}
+func NewRegistryDataListener() *dataListener {
+ return &dataListener{
+ subscribed:
make(map[string]config_center.ConfigurationListener),
+ }
}
-// AddInterestedURL adds a registration @url to listen
-func (l *dataListener) AddInterestedURL(url *common.URL) {
- l.interestedURL = append(l.interestedURL, url)
+// SubscribeURL sets a watch listener for url.
+func (l *dataListener) SubscribeURL(url *common.URL, listener
config_center.ConfigurationListener) {
+ if l.closed {
+ return
+ }
+ l.subscribed[url.ServiceKey()] = listener
+}
+
+// UnSubscribeURL unsets a watch listener for url.
+func (l *dataListener) UnSubscribeURL(url *common.URL)
config_center.ConfigurationListener {
+ if l.closed {
+ return nil
+ }
+ listener := l.subscribed[url.ServiceKey()]
+ if listener != nil {
+ listener.(*configurationListener).Close()
+ delete(l.subscribed, url.ServiceKey())
+ }
Review Comment:
UnSubscribeURL() assumes every entry in l.subscribed is a
*configurationListener and unconditionally type-asserts before calling Close().
However, InitListeners() explicitly guards against non-*configurationListener
entries in the subscribed map, and tests also insert a different listener type.
If such an entry is ever unsubscribed, this will panic.
Use a safe type assertion (or an interface check for a Close() method)
before closing.
##########
registry/etcdv3/registry.go:
##########
@@ -158,14 +195,37 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL)
(registry.Listener, error)
}
// register the svc to dataListener
- r.dataListener.AddInterestedURL(svc)
- go
r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory,
svc.Service()), r.dataListener)
+ configListener := NewConfigurationListener(r, svc)
+ r.dataListener.SubscribeURL(svc, configListener)
+ go listenServiceEvent(r.listener, etcdProviderPath(svc), r.dataListener)
return configListener, nil
}
func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener,
error) {
- return nil, perrors.New("DoUnsubscribe is not support in
etcdV3Registry")
+ if r.dataListener == nil {
+ return nil, perrors.New("etcd data listener is nil, can not
close")
+ }
+ r.dataListener.mutex.Lock()
+ subscribedListener := r.dataListener.subscribed[conf.ServiceKey()]
+ if subscribedListener != nil {
+ etcdListener, _ := subscribedListener.(*configurationListener)
+ if etcdListener != nil && etcdListener.isClosed {
+ r.dataListener.mutex.Unlock()
+ return nil, perrors.Errorf("configListener for service
%s has already been closed", conf.ServiceKey())
+ }
+ }
+ listener := r.dataListener.UnSubscribeURL(conf)
+ r.dataListener.mutex.Unlock()
+
+ if r.listener == nil {
+ return nil, perrors.New("etcd event listener is nil, can not
close")
+ }
+
+ if listener == nil {
+ return nil, nil
+ }
+ return listener.(registry.Listener), nil
Review Comment:
DoUnsubscribe unconditionally casts the removed entry to registry.Listener
(listener.(registry.Listener)). If the subscribed map ever contains a
config_center.ConfigurationListener that doesn't also implement
registry.Listener (or a nil/unknown type slips in), this will panic.
Prefer a checked type assertion and return a descriptive error when the
stored listener has an unexpected type.
##########
registry/etcdv3/registry.go:
##########
@@ -158,14 +195,37 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL)
(registry.Listener, error)
}
// register the svc to dataListener
- r.dataListener.AddInterestedURL(svc)
- go
r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory,
svc.Service()), r.dataListener)
+ configListener := NewConfigurationListener(r, svc)
+ r.dataListener.SubscribeURL(svc, configListener)
+ go listenServiceEvent(r.listener, etcdProviderPath(svc), r.dataListener)
return configListener, nil
}
func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener,
error) {
- return nil, perrors.New("DoUnsubscribe is not support in
etcdV3Registry")
+ if r.dataListener == nil {
+ return nil, perrors.New("etcd data listener is nil, can not
close")
+ }
+ r.dataListener.mutex.Lock()
+ subscribedListener := r.dataListener.subscribed[conf.ServiceKey()]
+ if subscribedListener != nil {
+ etcdListener, _ := subscribedListener.(*configurationListener)
+ if etcdListener != nil && etcdListener.isClosed {
+ r.dataListener.mutex.Unlock()
+ return nil, perrors.Errorf("configListener for service
%s has already been closed", conf.ServiceKey())
+ }
+ }
Review Comment:
DoUnsubscribe also reads configurationListener.isClosed, which can race with
concurrent Close() calls. Use the close channel to determine closed state
safely without introducing a data race.
##########
registry/etcdv3/registry.go:
##########
@@ -142,9 +167,21 @@ func (r *etcdV3Registry) CreatePath(k string) error {
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener,
error) {
- r.listenerLock.RLock()
- configListener := r.configListener
- r.listenerLock.RUnlock()
+ if r.dataListener == nil {
+ r.dataListener = NewRegistryDataListener()
+ }
+ r.dataListener.mutex.Lock()
+ defer r.dataListener.mutex.Unlock()
+ if listener := r.dataListener.subscribed[svc.ServiceKey()]; listener !=
nil {
+ etcdListener, _ := listener.(*configurationListener)
+ if etcdListener != nil {
+ if etcdListener.isClosed {
+ return nil, perrors.New("configListener already
been closed")
+ }
+ return etcdListener, nil
+ }
Review Comment:
DoSubscribe checks configurationListener.isClosed to decide whether an
existing listener can be reused. isClosed is written in
configurationListener.Close() without synchronization, so if a caller closes
the returned listener concurrently with another goroutine calling DoSubscribe,
this introduces a data race.
Prefer checking the close channel (which is safe to read concurrently)
rather than a plain bool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]