Copilot commented on code in PR #3387:
URL: https://github.com/apache/dubbo-go/pull/3387#discussion_r3384855709


##########
registry/etcdv3/registry.go:
##########
@@ -99,8 +104,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, 
error) {
 // InitListeners init listeners of etcd registry center
 func (r *etcdV3Registry) InitListeners() {
        r.listener = etcdv3.NewEventListener(r.client)
-       r.configListener = NewConfigurationListener(r)
-       r.dataListener = NewRegistryDataListener(r.configListener)
+       newDataListener := NewRegistryDataListener()
+       if r.dataListener != nil {
+               oldDataListener := r.dataListener
+               oldDataListener.mutex.Lock()
+               defer oldDataListener.mutex.Unlock()
+               oldDataListener.closed = true
+               for _, oldListener := range oldDataListener.subscribed {
+                       etcdListener, ok := oldListener.(*configurationListener)
+                       if !ok || etcdListener == nil || 
etcdListener.subscribeURL == nil {
+                               continue
+                       }
+                       etcdListener.Close()
+                       newListener := NewConfigurationListener(r, 
etcdListener.subscribeURL)
+                       newDataListener.SubscribeURL(etcdListener.subscribeURL, 
newListener)
+                       go listenServiceEvent(r.listener, 
etcdProviderPath(etcdListener.subscribeURL), newDataListener)
+               }
+       }
+       r.dataListener = newDataListener

Review Comment:
   In InitListeners(), newDataListener is mutated via SubscribeURL() while 
ListenServiceEvent goroutines are started that may concurrently call 
newDataListener.DataChange() (which reads l.subscribed under l.mutex). Because 
SubscribeURL() itself does not take l.mutex, this can lead to concurrent map 
read/write panics during recovery.
   
   Lock newDataListener.mutex for the duration of the recovery population (or 
otherwise ensure goroutines cannot call DataChange until after SubscribeURL() 
calls are done).



##########
registry/etcdv3/listener.go:
##########
@@ -65,66 +84,108 @@ func (l *dataListener) DataChange(eventType 
remoting.Event) bool {
                return false
        }
 
-       for _, v := range l.interestedURL {
-               if serviceURL.URLEqual(v) {
-                       l.listener.Process(
+       l.mutex.Lock()
+       defer l.mutex.Unlock()
+       if l.closed {
+               return false
+       }
+       match := false
+       for serviceKey, listener := range l.subscribed {
+               intf, group, version := common.ParseServiceKey(serviceKey)
+               if serviceURL.ServiceKey() == serviceKey || 
common.IsAnyCondition(intf, group, version, serviceURL) {
+                       listener.Process(
                                &config_center.ConfigChangeEvent{
                                        Key:        eventType.Path,
-                                       Value:      serviceURL,
+                                       Value:      serviceURL.Clone(),
                                        ConfigType: eventType.Action,
                                },
                        )
-                       return true
+                       match = true
                }
        }
-       return false
+       return match
+}
+
+// Close closes all subscribed configuration listeners.
+func (l *dataListener) Close() {
+       l.mutex.Lock()
+       defer l.mutex.Unlock()
+       l.closed = true
+       for _, listener := range l.subscribed {
+               listener.(*configurationListener).Close()
+       }

Review Comment:
   dataListener.Close() also unconditionally type-asserts each subscribed 
listener to *configurationListener. If any non-*configurationListener entry 
exists (InitListeners already anticipates this possibility), Close() will panic.
   
   Guard the type assertion (or close via an interface) to keep Close() safe 
even when the map contains unexpected listener implementations.



##########
registry/etcdv3/listener.go:
##########
@@ -65,66 +84,108 @@ func (l *dataListener) DataChange(eventType 
remoting.Event) bool {
                return false
        }
 
-       for _, v := range l.interestedURL {
-               if serviceURL.URLEqual(v) {
-                       l.listener.Process(
+       l.mutex.Lock()
+       defer l.mutex.Unlock()
+       if l.closed {
+               return false
+       }
+       match := false
+       for serviceKey, listener := range l.subscribed {
+               intf, group, version := common.ParseServiceKey(serviceKey)
+               if serviceURL.ServiceKey() == serviceKey || 
common.IsAnyCondition(intf, group, version, serviceURL) {
+                       listener.Process(
                                &config_center.ConfigChangeEvent{
                                        Key:        eventType.Path,
-                                       Value:      serviceURL,
+                                       Value:      serviceURL.Clone(),
                                        ConfigType: eventType.Action,
                                },
                        )
-                       return true
+                       match = true
                }
        }
-       return false
+       return match
+}
+
+// Close closes all subscribed configuration listeners.
+func (l *dataListener) Close() {
+       l.mutex.Lock()
+       defer l.mutex.Unlock()
+       l.closed = true
+       for _, listener := range l.subscribed {
+               listener.(*configurationListener).Close()
+       }
 }
 
 type configurationListener struct {
-       registry  *etcdV3Registry
-       events    *gxchan.UnboundedChan
-       closeOnce sync.Once
+       registry     *etcdV3Registry
+       events       *gxchan.UnboundedChan
+       isClosed     bool
+       close        chan struct{}
+       closeOnce    sync.Once
+       subscribeURL *common.URL
 }
 
 // NewConfigurationListener for listening the event of etcdv3.
-func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
+func NewConfigurationListener(reg *etcdV3Registry, conf *common.URL) 
*configurationListener {
        // add a new waiter
        reg.WaitGroup().Add(1)
-       return &configurationListener{registry: reg, events: 
gxchan.NewUnboundedChan(32)}
+       return &configurationListener{
+               registry:     reg,
+               events:       gxchan.NewUnboundedChan(32),
+               close:        make(chan struct{}),
+               subscribeURL: conf,
+       }
 }
 
 // Process data change event from config center of etcd
 func (l *configurationListener) Process(configType 
*config_center.ConfigChangeEvent) {
+       select {
+       case <-l.close:
+               return
+       default:
+       }
        l.events.In() <- configType
 }
 
 // Next returns next service event once received
 func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
        for {
                select {
+               case <-l.close:
+                       return nil, perrors.New("listener has been closed")
+
                case <-l.registry.Done():
                        logger.Warn("[Registry][Etcdv3] listener's etcd client 
connection is broken, so etcd event listener exit now")
                        return nil, perrors.New("listener stopped")
 
                case val := <-l.events.Out():
                        e, _ := val.(*config_center.ConfigChangeEvent)
                        logger.Infof("[Registry][Etcdv3] got etcd event %#v", e)
-                       if e.ConfigType == remoting.EventTypeDel && 
l.registry.client.Valid() {
-                               select {
-                               case <-l.registry.Done():
-                                       logger.Warnf("[Registry][Etcdv3] update 
@result{%s}. But its connection to registry is invalid", e.Value)
-                               default:
-                               }
+                       if l.shouldIgnoreDeleteEvent(e) {
                                continue
                        }
                        return &registry.ServiceEvent{Action: e.ConfigType, 
Service: e.Value.(*common.URL)}, nil
                }
        }
 }
 
+func (l *configurationListener) shouldIgnoreDeleteEvent(e 
*config_center.ConfigChangeEvent) bool {
+       if e.ConfigType != remoting.EventTypeDel || l.registry.client == nil || 
!validEtcdClient(l.registry.client) {
+               return false
+       }
+       select {

Review Comment:
   shouldIgnoreDeleteEvent() currently returns true (i.e., ignores the event) 
whenever the event is a delete *and* the etcd client is valid. This effectively 
discards all delete events under healthy connections, which is likely inverted 
logic (zookeeper ignores delete events when the client/connection is invalid).
   
   If the intent is to ignore spurious delete events during 
disconnect/reconnect, the condition should be based on an invalid client (or 
connection state), and the associated tests should be updated accordingly.



##########
registry/etcdv3/listener.go:
##########
@@ -37,18 +37,37 @@ import (
 )
 
 type dataListener struct {
-       interestedURL []*common.URL
-       listener      config_center.ConfigurationListener
+       subscribed map[string]config_center.ConfigurationListener
+       mutex      sync.Mutex
+       closed     bool
 }
 
 // NewRegistryDataListener creates a data listener for etcd
-func NewRegistryDataListener(listener config_center.ConfigurationListener) 
*dataListener {
-       return &dataListener{listener: listener}
+func NewRegistryDataListener() *dataListener {
+       return &dataListener{
+               subscribed: 
make(map[string]config_center.ConfigurationListener),
+       }
 }
 
-// AddInterestedURL adds a registration @url to listen
-func (l *dataListener) AddInterestedURL(url *common.URL) {
-       l.interestedURL = append(l.interestedURL, url)
+// SubscribeURL sets a watch listener for url.
+func (l *dataListener) SubscribeURL(url *common.URL, listener 
config_center.ConfigurationListener) {
+       if l.closed {
+               return
+       }
+       l.subscribed[url.ServiceKey()] = listener
+}
+
+// UnSubscribeURL unsets a watch listener for url.
+func (l *dataListener) UnSubscribeURL(url *common.URL) 
config_center.ConfigurationListener {
+       if l.closed {
+               return nil
+       }
+       listener := l.subscribed[url.ServiceKey()]
+       if listener != nil {
+               listener.(*configurationListener).Close()
+               delete(l.subscribed, url.ServiceKey())
+       }

Review Comment:
   UnSubscribeURL() assumes every entry in l.subscribed is a 
*configurationListener and unconditionally type-asserts before calling Close(). 
However, InitListeners() explicitly guards against non-*configurationListener 
entries in the subscribed map, and tests also insert a different listener type. 
If such an entry is ever unsubscribed, this will panic.
   
   Use a safe type assertion (or an interface check for a Close() method) 
before closing.



##########
registry/etcdv3/registry.go:
##########
@@ -158,14 +195,37 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) 
(registry.Listener, error)
        }
 
        // register the svc to dataListener
-       r.dataListener.AddInterestedURL(svc)
-       go 
r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory,
 svc.Service()), r.dataListener)
+       configListener := NewConfigurationListener(r, svc)
+       r.dataListener.SubscribeURL(svc, configListener)
+       go listenServiceEvent(r.listener, etcdProviderPath(svc), r.dataListener)
 
        return configListener, nil
 }
 
 func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, 
error) {
-       return nil, perrors.New("DoUnsubscribe is not support in 
etcdV3Registry")
+       if r.dataListener == nil {
+               return nil, perrors.New("etcd data listener is nil, can not 
close")
+       }
+       r.dataListener.mutex.Lock()
+       subscribedListener := r.dataListener.subscribed[conf.ServiceKey()]
+       if subscribedListener != nil {
+               etcdListener, _ := subscribedListener.(*configurationListener)
+               if etcdListener != nil && etcdListener.isClosed {
+                       r.dataListener.mutex.Unlock()
+                       return nil, perrors.Errorf("configListener for service 
%s has already been closed", conf.ServiceKey())
+               }
+       }
+       listener := r.dataListener.UnSubscribeURL(conf)
+       r.dataListener.mutex.Unlock()
+
+       if r.listener == nil {
+               return nil, perrors.New("etcd event listener is nil, can not 
close")
+       }
+
+       if listener == nil {
+               return nil, nil
+       }
+       return listener.(registry.Listener), nil

Review Comment:
   DoUnsubscribe unconditionally casts the removed entry to registry.Listener 
(listener.(registry.Listener)). If the subscribed map ever contains a 
config_center.ConfigurationListener that doesn't also implement 
registry.Listener (or a nil/unknown type slips in), this will panic.
   
   Prefer a checked type assertion and return a descriptive error when the 
stored listener has an unexpected type.



##########
registry/etcdv3/registry.go:
##########
@@ -158,14 +195,37 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) 
(registry.Listener, error)
        }
 
        // register the svc to dataListener
-       r.dataListener.AddInterestedURL(svc)
-       go 
r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory,
 svc.Service()), r.dataListener)
+       configListener := NewConfigurationListener(r, svc)
+       r.dataListener.SubscribeURL(svc, configListener)
+       go listenServiceEvent(r.listener, etcdProviderPath(svc), r.dataListener)
 
        return configListener, nil
 }
 
 func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, 
error) {
-       return nil, perrors.New("DoUnsubscribe is not support in 
etcdV3Registry")
+       if r.dataListener == nil {
+               return nil, perrors.New("etcd data listener is nil, can not 
close")
+       }
+       r.dataListener.mutex.Lock()
+       subscribedListener := r.dataListener.subscribed[conf.ServiceKey()]
+       if subscribedListener != nil {
+               etcdListener, _ := subscribedListener.(*configurationListener)
+               if etcdListener != nil && etcdListener.isClosed {
+                       r.dataListener.mutex.Unlock()
+                       return nil, perrors.Errorf("configListener for service 
%s has already been closed", conf.ServiceKey())
+               }
+       }

Review Comment:
   DoUnsubscribe also reads configurationListener.isClosed, which can race with 
concurrent Close() calls. Use the close channel to determine closed state 
safely without introducing a data race.



##########
registry/etcdv3/registry.go:
##########
@@ -142,9 +167,21 @@ func (r *etcdV3Registry) CreatePath(k string) error {
 
 // DoSubscribe actually subscribe the provider URL
 func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, 
error) {
-       r.listenerLock.RLock()
-       configListener := r.configListener
-       r.listenerLock.RUnlock()
+       if r.dataListener == nil {
+               r.dataListener = NewRegistryDataListener()
+       }
+       r.dataListener.mutex.Lock()
+       defer r.dataListener.mutex.Unlock()
+       if listener := r.dataListener.subscribed[svc.ServiceKey()]; listener != 
nil {
+               etcdListener, _ := listener.(*configurationListener)
+               if etcdListener != nil {
+                       if etcdListener.isClosed {
+                               return nil, perrors.New("configListener already 
been closed")
+                       }
+                       return etcdListener, nil
+               }

Review Comment:
   DoSubscribe checks configurationListener.isClosed to decide whether an 
existing listener can be reused. isClosed is written in 
configurationListener.Close() without synchronization, so if a caller closes 
the returned listener concurrently with another goroutine calling DoSubscribe, 
this introduces a data race.
   
   Prefer checking the close channel (which is safe to read concurrently) 
rather than a plain bool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to