This is an automated email from the ASF dual-hosted git repository.

fangyc pushed a commit to branch 1.3
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.3 by this push:
     new 7fcb34e  add zk register code
     new 5f82281  Merge pull request #355 from pantianying/fix_zkproblemto1.3
7fcb34e is described below

commit 7fcb34eb5612a5ad47c24ca4bc7aecc194f801e4
Author: pantianying <601666...@qq.com>
AuthorDate: Sat Feb 8 12:34:14 2020 +0800

    add zk register code
---
 config_center/zookeeper/impl.go      |  63 +++---
 config_center/zookeeper/impl_test.go |   2 +-
 go.mod                               |   2 +-
 go.sum                               |   4 +-
 registry/base_register.go            | 375 +++++++++++++++++++++++++++++++++++
 registry/etcdv3/listener.go          |  10 +-
 registry/etcdv3/registry.go          | 264 +++---------------------
 registry/etcdv3/registry_test.go     |  14 +-
 registry/registry.go                 |  12 +-
 registry/zookeeper/listener.go       |  18 +-
 registry/zookeeper/registry.go       | 354 ++++-----------------------------
 registry/zookeeper/registry_test.go  |   4 +-
 remoting/etcdv3/client.go            |  32 ++-
 remoting/etcdv3/facade.go            |   7 +-
 remoting/etcdv3/listener.go          |   9 +-
 remoting/listener.go                 |   6 +
 remoting/zookeeper/client.go         | 128 ++++++++----
 remoting/zookeeper/client_test.go    |  11 +-
 remoting/zookeeper/facade.go         |   7 +-
 remoting/zookeeper/facade_test.go    |   4 +-
 remoting/zookeeper/listener.go       |  75 +++----
 remoting/zookeeper/listener_test.go  |   5 +-
 22 files changed, 689 insertions(+), 717 deletions(-)

diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index 504d491..70fb196 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -24,8 +24,8 @@ import (
 )
 
 import (
+       "github.com/dubbogo/go-zookeeper/zk"
        perrors "github.com/pkg/errors"
-       "github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -37,7 +37,11 @@ import (
        "github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
-const ZkClient = "zk config_center"
+const (
+       // ZkClient
+       //zookeeper client name
+       ZkClient = "zk config_center"
+)
 
 type zookeeperDynamicConfiguration struct {
        url      *common.URL
@@ -134,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key 
string, opts ...config
        content, _, err := c.client.GetContent(c.rootPath + "/" + key)
        if err != nil {
                return "", perrors.WithStack(err)
-       } else {
-               return string(content), nil
        }
 
+       return string(content), nil
 }
 
 //For zookeeper, getConfig and getConfigs have the same meaning.
@@ -156,57 +159,57 @@ func (c *zookeeperDynamicConfiguration) SetParser(p 
parser.ConfigurationParser)
        c.parser = p
 }
 
-func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
-       return r.client
+func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
+       return c.client
 }
 
-func (r *zookeeperDynamicConfiguration) SetZkClient(client 
*zookeeper.ZookeeperClient) {
-       r.client = client
+func (c *zookeeperDynamicConfiguration) SetZkClient(client 
*zookeeper.ZookeeperClient) {
+       c.client = client
 }
 
-func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
-       return &r.cltLock
+func (c *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
+       return &c.cltLock
 }
 
-func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
-       return &r.wg
+func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
+       return &c.wg
 }
 
-func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} {
-       return r.done
+func (c *zookeeperDynamicConfiguration) Done() chan struct{} {
+       return c.done
 }
 
-func (r *zookeeperDynamicConfiguration) GetUrl() common.URL {
-       return *r.url
+func (c *zookeeperDynamicConfiguration) GetUrl() common.URL {
+       return *c.url
 }
 
-func (r *zookeeperDynamicConfiguration) Destroy() {
-       if r.listener != nil {
-               r.listener.Close()
+func (c *zookeeperDynamicConfiguration) Destroy() {
+       if c.listener != nil {
+               c.listener.Close()
        }
-       close(r.done)
-       r.wg.Wait()
-       r.closeConfigs()
+       close(c.done)
+       c.wg.Wait()
+       c.closeConfigs()
 }
 
-func (r *zookeeperDynamicConfiguration) IsAvailable() bool {
+func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
        select {
-       case <-r.done:
+       case <-c.done:
                return false
        default:
                return true
        }
 }
 
-func (r *zookeeperDynamicConfiguration) closeConfigs() {
-       r.cltLock.Lock()
-       defer r.cltLock.Unlock()
+func (c *zookeeperDynamicConfiguration) closeConfigs() {
+       c.cltLock.Lock()
+       defer c.cltLock.Unlock()
        logger.Infof("begin to close provider zk client")
        // Close the old client first to close the tmp node
-       r.client.Close()
-       r.client = nil
+       c.client.Close()
+       c.client = nil
 }
 
-func (r *zookeeperDynamicConfiguration) RestartCallBack() bool {
+func (c *zookeeperDynamicConfiguration) RestartCallBack() bool {
        return true
 }
diff --git a/config_center/zookeeper/impl_test.go 
b/config_center/zookeeper/impl_test.go
index e614009..cca4427 100644
--- a/config_center/zookeeper/impl_test.go
+++ b/config_center/zookeeper/impl_test.go
@@ -24,7 +24,7 @@ import (
 )
 
 import (
-       "github.com/samuel/go-zookeeper/zk"
+       "github.com/dubbogo/go-zookeeper/zk"
        "github.com/stretchr/testify/assert"
 )
 
diff --git a/go.mod b/go.mod
index c89b397..f5ac8e5 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
        github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
        github.com/creasty/defaults v1.3.0
        github.com/dubbogo/getty v1.3.2
+       github.com/dubbogo/go-zookeeper v1.0.0
        github.com/dubbogo/gost v1.5.2
        github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // 
indirect
        github.com/go-errors/errors v1.0.1 // indirect
@@ -37,7 +38,6 @@ require (
        github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
        github.com/pkg/errors v0.8.1
        github.com/prometheus/client_golang v1.1.0 // indirect
-       github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
        github.com/satori/go.uuid v1.2.0
        github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // 
indirect
        github.com/soheilhy/cmux v0.1.4 // indirect
diff --git a/go.sum b/go.sum
index b23cb24..0f42e86 100644
--- a/go.sum
+++ b/go.sum
@@ -104,6 +104,8 @@ github.com/docker/go-units v0.3.3 
h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk
 github.com/docker/go-units v0.3.3/go.mod 
h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
 github.com/dubbogo/getty v1.3.2 h1:l1KVSs/1CtTKbIPTrkTtBT6S9ddvmswDGoAnnl2CDpM=
 github.com/dubbogo/getty v1.3.2/go.mod 
h1:ANbVQ9tbpZ2b0xdR8nRrgS/oXIsZAeRxzvPSOn/7mbk=
+github.com/dubbogo/go-zookeeper v1.0.0 
h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM=
+github.com/dubbogo/go-zookeeper v1.0.0/go.mod 
h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
 github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg=
 github.com/dubbogo/gost v1.5.1/go.mod 
h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
 github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
@@ -410,8 +412,6 @@ github.com/rogpeppe/fastuuid 
v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod 
h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 
h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE=
 github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod 
h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec 
h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod 
h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
 github.com/satori/go.uuid v1.2.0 
h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
 github.com/satori/go.uuid v1.2.0/go.mod 
h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 
h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
diff --git a/registry/base_register.go b/registry/base_register.go
new file mode 100644
index 0000000..5b9aef8
--- /dev/null
+++ b/registry/base_register.go
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+       "context"
+       "fmt"
+       "net/url"
+       "os"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+)
+
+import (
+       gxnet "github.com/dubbogo/gost/net"
+       perrors "github.com/pkg/errors"
+)
+
+import (
+       "github.com/apache/dubbo-go/common"
+       "github.com/apache/dubbo-go/common/constant"
+       "github.com/apache/dubbo-go/common/logger"
+)
+
+const (
+       // RegistryConnDelay connection delay
+       RegistryConnDelay = 3
+       // MaxWaitInterval max wait interval
+       MaxWaitInterval = 3 * time.Second
+)
+
+var (
+       processID = ""
+       localIP   = ""
+)
+
+func init() {
+       processID = fmt.Sprintf("%d", os.Getpid())
+       localIP, _ = gxnet.GetLocalIP()
+}
+
+/*
+ * 
-----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and 
implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry 
interface.
+ * 
--------------------------------------------------------------------------------------
+ */
+
+/*
+ * FacadeBasedRegistry interface is subclass of Registry, and it is designed 
for registry who want to inherit BaseRegistry.
+ * You have to implement the interface to inherit BaseRegistry.
+ */
+type FacadeBasedRegistry interface {
+       Registry
+       CreatePath(string) error
+       DoRegister(string, string) error
+       DoSubscribe(conf *common.URL) (Listener, error)
+       CloseAndNilClient()
+       CloseListener()
+       InitListeners()
+}
+
+// BaseRegistry is a common logic abstract for registry. It implement Registry 
interface.
+type BaseRegistry struct {
+       context             context.Context
+       facadeBasedRegistry FacadeBasedRegistry
+       *common.URL
+       birth    int64          // time of file birth, seconds since Epoch; 0 
if unknown
+       wg       sync.WaitGroup // wg+done for zk restart
+       done     chan struct{}
+       cltLock  sync.Mutex            //ctl lock is a lock for services map
+       services map[string]common.URL // service name + protocol -> service 
config, for store the service registered
+}
+
+// InitBaseRegistry for init some local variables and set BaseRegistry's 
subclass to it
+func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry 
FacadeBasedRegistry) Registry {
+       r.URL = url
+       r.birth = time.Now().UnixNano()
+       r.done = make(chan struct{})
+       r.services = make(map[string]common.URL)
+       r.facadeBasedRegistry = facadeRegistry
+       return r
+}
+
+// GetUrl for get registry's url
+func (r *BaseRegistry) GetUrl() common.URL {
+       return *r.URL
+}
+
+// Destroy for graceful down
+func (r *BaseRegistry) Destroy() {
+       //first step close registry's all listeners
+       r.facadeBasedRegistry.CloseListener()
+       // then close r.done to notify other program who listen to it
+       close(r.done)
+       // wait waitgroup done (wait listeners outside close over)
+       r.wg.Wait()
+       //close registry client
+       r.closeRegisters()
+}
+
+// Register implement interface registry to register
+func (r *BaseRegistry) Register(conf common.URL) error {
+       var (
+               ok  bool
+               err error
+       )
+       role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+       // Check if the service has been registered
+       r.cltLock.Lock()
+       _, ok = r.services[conf.Key()]
+       r.cltLock.Unlock()
+       if ok {
+               return perrors.Errorf("Path{%s} has been registered", 
conf.Key())
+       }
+
+       err = r.register(conf)
+       if err != nil {
+               return perrors.WithMessagef(err, "register(conf:%+v)", conf)
+       }
+
+       r.cltLock.Lock()
+       r.services[conf.Key()] = conf
+       r.cltLock.Unlock()
+       logger.Debugf("(%sRegistry)Register(conf{%#v})", 
common.DubboRole[role], conf)
+
+       return nil
+}
+
+// service is for getting service path stored in url
+func (r *BaseRegistry) service(c common.URL) string {
+       return url.QueryEscape(c.Service())
+}
+
+// RestartCallBack for reregister when reconnect
+func (r *BaseRegistry) RestartCallBack() bool {
+
+       // copy r.services
+       services := []common.URL{}
+       for _, confIf := range r.services {
+               services = append(services, confIf)
+       }
+
+       flag := true
+       for _, confIf := range services {
+               err := r.register(confIf)
+               if err != nil {
+                       logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) 
= error{%#v}",
+                               confIf, perrors.WithStack(err))
+                       flag = false
+                       break
+               }
+               logger.Infof("success to re-register service :%v", confIf.Key())
+       }
+       r.facadeBasedRegistry.InitListeners()
+
+       return flag
+}
+
+// register for register url to registry, include init params
+func (r *BaseRegistry) register(c common.URL) error {
+       var (
+               err error
+               //revision   string
+               params     url.Values
+               rawURL     string
+               encodedURL string
+               dubboPath  string
+               //conf       config.URL
+       )
+       params = url.Values{}
+
+       c.RangeParams(func(key, value string) bool {
+               params.Add(key, value)
+               return true
+       })
+
+       params.Add("pid", processID)
+       params.Add("ip", localIP)
+       //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
+
+       role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+       switch role {
+
+       case common.PROVIDER:
+               dubboPath, rawURL, err = r.providerRegistry(c, params)
+       case common.CONSUMER:
+               dubboPath, rawURL, err = r.consumerRegistry(c, params)
+       default:
+               return perrors.Errorf("@c{%v} type is not referencer or 
provider", c)
+       }
+       encodedURL = url.QueryEscape(rawURL)
+       dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
+       err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL)
+
+       if err != nil {
+               return perrors.WithMessagef(err, "register Node(path:%s, 
url:%s)", dubboPath, rawURL)
+       }
+       return nil
+}
+
+// providerRegistry for provider role do
+func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) 
(string, string, error) {
+       var (
+               dubboPath string
+               rawURL    string
+               err       error
+       )
+       if c.Path == "" || len(c.Methods) == 0 {
+               return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", 
c.Path, c.Methods)
+       }
+       dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.PROVIDER])
+       r.cltLock.Lock()
+       err = r.facadeBasedRegistry.CreatePath(dubboPath)
+       r.cltLock.Unlock()
+       if err != nil {
+               logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = 
error{%#v}", dubboPath, perrors.WithStack(err))
+               return "", "", perrors.WithMessagef(err, 
"facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
+       }
+       params.Add("anyhost", "true")
+
+       // Dubbo java consumer to start looking for the provider url,because 
the category does not match,
+       // the provider will not find, causing the consumer can not start, so 
we use consumers.
+       // DubboRole               = [...]string{"consumer", "", "", "provider"}
+       // params.Add("category", (RoleType(PROVIDER)).Role())
+       params.Add("category", (common.RoleType(common.PROVIDER)).String())
+       params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
+
+       params.Add("side", (common.RoleType(common.PROVIDER)).Role())
+
+       if len(c.Methods) == 0 {
+               params.Add("methods", strings.Join(c.Methods, ","))
+       }
+       logger.Debugf("provider url params:%#v", params)
+       var host string
+       if c.Ip == "" {
+               host = localIP + ":" + c.Port
+       } else {
+               host = c.Ip + ":" + c.Port
+       }
+
+       rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, 
params.Encode())
+       // Print your own registration service providers.
+       dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
(common.RoleType(common.PROVIDER)).String())
+       logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
+       return dubboPath, rawURL, nil
+}
+
+// consumerRegistry for consumer role do
+func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) 
(string, string, error) {
+       var (
+               dubboPath string
+               rawURL    string
+               err       error
+       )
+       dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.CONSUMER])
+       r.cltLock.Lock()
+       err = r.facadeBasedRegistry.CreatePath(dubboPath)
+       r.cltLock.Unlock()
+       if err != nil {
+               logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = 
error{%v}", dubboPath, perrors.WithStack(err))
+               return "", "", perrors.WithStack(err)
+       }
+       dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.PROVIDER])
+       r.cltLock.Lock()
+       err = r.facadeBasedRegistry.CreatePath(dubboPath)
+       r.cltLock.Unlock()
+       if err != nil {
+               logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = 
error{%v}", dubboPath, perrors.WithStack(err))
+               return "", "", perrors.WithStack(err)
+       }
+
+       params.Add("protocol", c.Protocol)
+       params.Add("category", (common.RoleType(common.CONSUMER)).String())
+       params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
+
+       rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, 
params.Encode())
+       dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
(common.RoleType(common.CONSUMER)).String())
+
+       logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
+       return dubboPath, rawURL, nil
+}
+
+// sleepWait...
+func sleepWait(n int) {
+       wait := time.Duration((n + 1) * 2e8)
+       if wait > MaxWaitInterval {
+               wait = MaxWaitInterval
+       }
+       time.Sleep(wait)
+}
+
+// Subscribe :subscribe from registry, event will notify by notifyListener
+func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener 
NotifyListener) {
+       n := 0
+       for {
+               n++
+               if !r.IsAvailable() {
+                       logger.Warnf("event listener game over.")
+                       return
+               }
+
+               listener, err := r.facadeBasedRegistry.DoSubscribe(url)
+               if err != nil {
+                       if !r.IsAvailable() {
+                               logger.Warnf("event listener game over.")
+                               return
+                       }
+                       logger.Warnf("getListener() = err:%v", 
perrors.WithStack(err))
+                       time.Sleep(time.Duration(RegistryConnDelay) * 
time.Second)
+                       continue
+               }
+
+               for {
+                       if serviceEvent, err := listener.Next(); err != nil {
+                               logger.Warnf("Selector.watch() = error{%v}", 
perrors.WithStack(err))
+                               listener.Close()
+                               break
+                       } else {
+                               logger.Infof("update begin, service event: %v", 
serviceEvent.String())
+                               notifyListener.Notify(serviceEvent)
+                       }
+
+               }
+               sleepWait(n)
+       }
+}
+
+// closeRegisters close and remove registry client and reset services map
+func (r *BaseRegistry) closeRegisters() {
+       r.cltLock.Lock()
+       defer r.cltLock.Unlock()
+       logger.Infof("begin to close provider client")
+       // Close and remove(set to nil) the registry client
+       r.facadeBasedRegistry.CloseAndNilClient()
+       // reset the services map
+       r.services = nil
+}
+
+// IsAvailable judge to is registry not closed by chan r.done
+func (r *BaseRegistry) IsAvailable() bool {
+       select {
+       case <-r.done:
+               return false
+       default:
+               return true
+       }
+}
+
+// WaitGroup open for outside add the waitgroup to add some logic before 
registry destroyed over(graceful down)
+func (r *BaseRegistry) WaitGroup() *sync.WaitGroup {
+       return &r.wg
+}
+
+// Done open for outside to listen the event of registry Destroy() called.
+func (r *BaseRegistry) Done() chan struct{} {
+       return r.done
+}
diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go
index 31d62fa..5ed56f6 100644
--- a/registry/etcdv3/listener.go
+++ b/registry/etcdv3/listener.go
@@ -39,6 +39,7 @@ type dataListener struct {
        listener      config_center.ConfigurationListener
 }
 
+// NewRegistryDataListener ...
 func NewRegistryDataListener(listener config_center.ConfigurationListener) 
*dataListener {
        return &dataListener{listener: listener, interestedURL: []*common.URL{}}
 }
@@ -77,9 +78,10 @@ type configurationListener struct {
        events   chan *config_center.ConfigChangeEvent
 }
 
+// NewConfigurationListener for listening the event of etcdv3.
 func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
        // add a new waiter
-       reg.wg.Add(1)
+       reg.WaitGroup().Add(1)
        return &configurationListener{registry: reg, events: make(chan 
*config_center.ConfigChangeEvent, 32)}
 }
 func (l *configurationListener) Process(configType 
*config_center.ConfigChangeEvent) {
@@ -89,7 +91,7 @@ func (l *configurationListener) Process(configType 
*config_center.ConfigChangeEv
 func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
        for {
                select {
-               case <-l.registry.done:
+               case <-l.registry.Done():
                        logger.Warnf("listener's etcd client connection is 
broken, so etcd event listener exit now.")
                        return nil, perrors.New("listener stopped")
 
@@ -97,7 +99,7 @@ func (l *configurationListener) Next() 
(*registry.ServiceEvent, error) {
                        logger.Infof("got etcd event %#v", e)
                        if e.ConfigType == remoting.EventTypeDel {
                                select {
-                               case <-l.registry.done:
+                               case <-l.registry.Done():
                                        logger.Warnf("update @result{%s}. But 
its connection to registry is invalid", e.Value)
                                default:
                                }
@@ -108,5 +110,5 @@ func (l *configurationListener) Next() 
(*registry.ServiceEvent, error) {
        }
 }
 func (l *configurationListener) Close() {
-       l.registry.wg.Done()
+       l.registry.WaitGroup().Done()
 }
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index b058113..e1c2576 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -19,17 +19,13 @@ package etcdv3
 
 import (
        "fmt"
-       "net/url"
-       "os"
        "path"
-       "strconv"
        "strings"
        "sync"
        "time"
 )
 
 import (
-       gxnet "github.com/dubbogo/gost/net"
        perrors "github.com/pkg/errors"
 )
 
@@ -42,74 +38,39 @@ import (
        "github.com/apache/dubbo-go/remoting/etcdv3"
 )
 
-var (
-       processID = ""
-       localIP   = ""
-)
-
 const (
-       Name              = "etcdv3"
-       RegistryConnDelay = 3
+       // Name module name
+       Name = "etcdv3"
 )
 
 func init() {
-       processID = fmt.Sprintf("%d", os.Getpid())
-       localIP, _ = gxnet.GetLocalIP()
        extension.SetRegistry(Name, newETCDV3Registry)
 }
 
 type etcdV3Registry struct {
-       *common.URL
-       birth int64 // time of file birth, seconds since Epoch; 0 if unknown
-
-       cltLock  sync.Mutex
-       client   *etcdv3.Client
-       services map[string]common.URL // service name + protocol -> service 
config
-
+       registry.BaseRegistry
+       cltLock        sync.Mutex
+       client         *etcdv3.Client
        listenerLock   sync.Mutex
        listener       *etcdv3.EventListener
        dataListener   *dataListener
        configListener *configurationListener
-
-       wg   sync.WaitGroup // wg+done for etcd client restart
-       done chan struct{}
 }
 
+// Client get the etcdv3 client
 func (r *etcdV3Registry) Client() *etcdv3.Client {
        return r.client
 }
+
+//SetClient set the etcdv3 client
 func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
        r.client = client
 }
+
+//
 func (r *etcdV3Registry) ClientLock() *sync.Mutex {
        return &r.cltLock
 }
-func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
-       return &r.wg
-}
-func (r *etcdV3Registry) GetDone() chan struct{} {
-       return r.done
-}
-func (r *etcdV3Registry) RestartCallBack() bool {
-
-       services := []common.URL{}
-       for _, confIf := range r.services {
-               services = append(services, confIf)
-       }
-
-       flag := true
-       for _, confIf := range services {
-               err := r.Register(confIf)
-               if err != nil {
-                       
logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
-                               confIf, perrors.WithStack(err))
-                       flag = false
-                       break
-               }
-               logger.Infof("success to re-register service :%v", confIf.Key())
-       }
-       return flag
-}
 
 func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
 
@@ -122,12 +83,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, 
error) {
 
        logger.Infof("etcd address is: %v, timeout is: %s", url.Location, 
timeout.String())
 
-       r := &etcdV3Registry{
-               URL:      url,
-               birth:    time.Now().UnixNano(),
-               done:     make(chan struct{}),
-               services: make(map[string]common.URL),
-       }
+       r := &etcdV3Registry{}
+
+       r.InitBaseRegistry(url, r)
 
        if err := etcdv3.ValidateClient(
                r,
@@ -137,89 +95,37 @@ func newETCDV3Registry(url *common.URL) 
(registry.Registry, error) {
        ); err != nil {
                return nil, err
        }
+       r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1
 
-       r.wg.Add(1)
        go etcdv3.HandleClientRestart(r)
 
-       r.listener = etcdv3.NewEventListener(r.client)
-       r.configListener = NewConfigurationListener(r)
-       r.dataListener = NewRegistryDataListener(r.configListener)
+       r.InitListeners()
 
        return r, nil
 }
 
-func (r *etcdV3Registry) GetUrl() common.URL {
-       return *r.URL
-}
-
-func (r *etcdV3Registry) IsAvailable() bool {
-
-       select {
-       case <-r.done:
-               return false
-       default:
-               return true
-       }
+func (r *etcdV3Registry) InitListeners() {
+       r.listener = etcdv3.NewEventListener(r.client)
+       r.configListener = NewConfigurationListener(r)
+       r.dataListener = NewRegistryDataListener(r.configListener)
 }
 
-func (r *etcdV3Registry) Destroy() {
-
-       if r.configListener != nil {
-               r.configListener.Close()
-       }
-       r.stop()
+func (r *etcdV3Registry) DoRegister(root string, node string) error {
+       return r.client.Create(path.Join(root, node), "")
 }
 
-func (r *etcdV3Registry) stop() {
-
-       close(r.done)
-
-       // close current client
+func (r *etcdV3Registry) CloseAndNilClient() {
        r.client.Close()
-
-       r.cltLock.Lock()
        r.client = nil
-       r.services = nil
-       r.cltLock.Unlock()
 }
 
-func (r *etcdV3Registry) Register(svc common.URL) error {
-
-       role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-       if err != nil {
-               return perrors.WithMessage(err, "get registry role")
-       }
-
-       r.cltLock.Lock()
-       if _, ok := r.services[svc.Key()]; ok {
-               r.cltLock.Unlock()
-               return perrors.New(fmt.Sprintf("Path{%s} has been registered", 
svc.Path))
-       }
-       r.cltLock.Unlock()
-
-       switch role {
-       case common.PROVIDER:
-               logger.Debugf("(provider register )Register(conf{%#v})", svc)
-               if err := r.registerProvider(svc); err != nil {
-                       return perrors.WithMessage(err, "register provider")
-               }
-       case common.CONSUMER:
-               logger.Debugf("(consumer register )Register(conf{%#v})", svc)
-               if err := r.registerConsumer(svc); err != nil {
-                       return perrors.WithMessage(err, "register consumer")
-               }
-       default:
-               return perrors.New(fmt.Sprintf("unknown role %d", role))
+func (r *etcdV3Registry) CloseListener() {
+       if r.configListener != nil {
+               r.configListener.Close()
        }
-
-       r.cltLock.Lock()
-       r.services[svc.Key()] = svc
-       r.cltLock.Unlock()
-       return nil
 }
 
-func (r *etcdV3Registry) createDirIfNotExist(k string) error {
-
+func (r *etcdV3Registry) CreatePath(k string) error {
        var tmpPath string
        for _, str := range strings.Split(k, "/")[1:] {
                tmpPath = path.Join(tmpPath, "/", str)
@@ -231,89 +137,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) 
error {
        return nil
 }
 
-func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
-
-       consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), 
common.DubboNodes[common.CONSUMER])
-       if err := r.createDirIfNotExist(consumersNode); err != nil {
-               logger.Errorf("etcd client create path %s: %v", consumersNode, 
err)
-               return perrors.WithMessage(err, "etcd create consumer nodes")
-       }
-       providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), 
common.DubboNodes[common.PROVIDER])
-       if err := r.createDirIfNotExist(providersNode); err != nil {
-               return perrors.WithMessage(err, "create provider node")
-       }
-
-       params := url.Values{}
-
-       params.Add("protocol", svc.Protocol)
-
-       params.Add("category", (common.RoleType(common.CONSUMER)).String())
-       params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
-       encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", 
localIP, svc.Path, params.Encode()))
-       dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), 
(common.RoleType(common.CONSUMER)).String())
-       if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != 
nil {
-               return perrors.WithMessagef(err, "create k/v in etcd (path:%s, 
url:%s)", dubboPath, encodedURL)
-       }
-
-       return nil
-}
-
-func (r *etcdV3Registry) registerProvider(svc common.URL) error {
-
-       if len(svc.Path) == 0 || len(svc.Methods) == 0 {
-               return perrors.New(fmt.Sprintf("service path %s or service 
method %s", svc.Path, svc.Methods))
-       }
-
-       var (
-               urlPath    string
-               encodedURL string
-               dubboPath  string
-       )
-
-       providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), 
common.DubboNodes[common.PROVIDER])
-       if err := r.createDirIfNotExist(providersNode); err != nil {
-               return perrors.WithMessage(err, "create provider node")
-       }
-
-       params := url.Values{}
-
-       svc.RangeParams(func(key, value string) bool {
-               params[key] = []string{value}
-               return true
-       })
-       params.Add("pid", processID)
-       params.Add("ip", localIP)
-       params.Add("anyhost", "true")
-       params.Add("category", (common.RoleType(common.PROVIDER)).String())
-       params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
-       params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
-       if len(svc.Methods) == 0 {
-               params.Add("methods", strings.Join(svc.Methods, ","))
-       }
-
-       logger.Debugf("provider url params:%#v", params)
-       var host string
-       if len(svc.Ip) == 0 {
-               host = localIP + ":" + svc.Port
-       } else {
-               host = svc.Ip + ":" + svc.Port
-       }
-
-       urlPath = svc.Path
-
-       encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, 
host, urlPath, params.Encode()))
-       dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), 
(common.RoleType(common.PROVIDER)).String())
-
-       if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != 
nil {
-               return perrors.WithMessagef(err, "create k/v in etcd (path:%s, 
url:%s)", dubboPath, encodedURL)
-       }
-
-       return nil
-}
-
-func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) 
{
+func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, 
error) {
 
        var (
                configListener *configurationListener
@@ -346,37 +170,3 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) 
(registry.Listener, error) {
 
        return configListener, nil
 }
-
-//subscribe from registry
-func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener 
registry.NotifyListener) {
-       for {
-               if !r.IsAvailable() {
-                       logger.Warnf("event listener game over.")
-                       return
-               }
-
-               listener, err := r.subscribe(url)
-               if err != nil {
-                       if !r.IsAvailable() {
-                               logger.Warnf("event listener game over.")
-                               return
-                       }
-                       logger.Warnf("getListener() = err:%v", 
perrors.WithStack(err))
-                       time.Sleep(time.Duration(RegistryConnDelay) * 
time.Second)
-                       continue
-               }
-
-               for {
-                       if serviceEvent, err := listener.Next(); err != nil {
-                               logger.Warnf("Selector.watch() = error{%v}", 
perrors.WithStack(err))
-                               listener.Close()
-                               return
-                       } else {
-                               logger.Infof("update begin, service event: %v", 
serviceEvent.String())
-                               notifyListener.Notify(serviceEvent)
-                       }
-
-               }
-
-       }
-}
diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go
index 6c05a8a..87cf240 100644
--- a/registry/etcdv3/registry_test.go
+++ b/registry/etcdv3/registry_test.go
@@ -46,7 +46,8 @@ func initRegistry(t *testing.T) *etcdV3Registry {
        }
 
        out := reg.(*etcdV3Registry)
-       out.client.CleanKV()
+       err = out.client.CleanKV()
+       assert.NoError(t, err)
        return out
 }
 
@@ -58,6 +59,7 @@ func (suite *RegistryTestSuite) TestRegister() {
 
        reg := initRegistry(t)
        err := reg.Register(url)
+       assert.NoError(t, err)
        children, _, err := 
reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers")
        if err != nil {
                t.Fatal(err)
@@ -83,8 +85,9 @@ func (suite *RegistryTestSuite) TestSubscribe() {
        regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
        reg2 := initRegistry(t)
 
-       reg2.Register(url)
-       listener, err := reg2.subscribe(&url)
+       err = reg2.Register(url)
+       assert.NoError(t, err)
+       listener, err := reg2.DoSubscribe(&url)
        if err != nil {
                t.Fatal(err)
        }
@@ -102,7 +105,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {
        url, _ := common.NewURL(context.Background(), 
"dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", 
common.WithParamsValue(constant.CLUSTER_KEY, "mock"), 
common.WithMethods([]string{"GetUser", "AddUser"}))
 
        reg := initRegistry(t)
-       _, err := reg.subscribe(&url)
+       _, err := reg.DoSubscribe(&url)
        if err != nil {
                t.Fatal(err)
        }
@@ -120,7 +123,8 @@ func (suite *RegistryTestSuite) TestProviderDestory() {
        t := suite.T()
        reg := initRegistry(t)
        url, _ := common.NewURL(context.Background(), 
"dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", 
common.WithParamsValue(constant.CLUSTER_KEY, "mock"), 
common.WithMethods([]string{"GetUser", "AddUser"}))
-       reg.Register(url)
+       err := reg.Register(url)
+       assert.NoError(t, err)
 
        //listener.Close()
        time.Sleep(1e9)
diff --git a/registry/registry.go b/registry/registry.go
index c7279a2..d673864 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -21,7 +21,13 @@ import (
        "github.com/apache/dubbo-go/common"
 )
 
-// Extension - Registry
+/*
+ * 
-----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and 
implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry 
interface.
+ * 
--------------------------------------------------------------------------------------
+ */
+// Registry Extension - Registry
 type Registry interface {
        common.Node
        //used for service provider calling , register services to registry
@@ -38,11 +44,13 @@ type Registry interface {
        //mode2 : callback mode, subscribe with notify(notify listener).
        Subscribe(*common.URL, NotifyListener)
 }
+
+// NotifyListener ...
 type NotifyListener interface {
        Notify(*ServiceEvent)
 }
 
-//Deprecated!
+// Listener Deprecated!
 type Listener interface {
        Next() (*ServiceEvent, error)
        Close()
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 53a5926..e895243 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -36,18 +36,23 @@ import (
        zk "github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
+// RegistryDataListener ...
 type RegistryDataListener struct {
        interestedURL []*common.URL
        listener      config_center.ConfigurationListener
 }
 
+// NewRegistryDataListener ...
 func NewRegistryDataListener(listener config_center.ConfigurationListener) 
*RegistryDataListener {
        return &RegistryDataListener{listener: listener, interestedURL: 
[]*common.URL{}}
 }
+
+// AddInterestedURL ...
 func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
        l.interestedURL = append(l.interestedURL, url)
 }
 
+// DataChange ...
 func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
        // Intercept the last bit
        index := strings.Index(eventType.Path, "/providers/")
@@ -71,6 +76,7 @@ func (l *RegistryDataListener) DataChange(eventType 
remoting.Event) bool {
        return false
 }
 
+// RegistryConfigurationListener ...
 type RegistryConfigurationListener struct {
        client    *zk.ZookeeperClient
        registry  *zkRegistry
@@ -79,14 +85,18 @@ type RegistryConfigurationListener struct {
        closeOnce sync.Once
 }
 
+// NewRegistryConfigurationListener for listening the event of zk.
 func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg 
*zkRegistry) *RegistryConfigurationListener {
-       reg.wg.Add(1)
+       reg.WaitGroup().Add(1)
        return &RegistryConfigurationListener{client: client, registry: reg, 
events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
 }
+
+// Process ...
 func (l *RegistryConfigurationListener) Process(configType 
*config_center.ConfigChangeEvent) {
        l.events <- configType
 }
 
+// Next ...
 func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) 
{
        for {
                select {
@@ -94,7 +104,7 @@ func (l *RegistryConfigurationListener) Next() 
(*registry.ServiceEvent, error) {
                        logger.Warnf("listener's zk client connection is 
broken, so zk event listener exit now.")
                        return nil, perrors.New("listener stopped")
 
-               case <-l.registry.done:
+               case <-l.registry.Done():
                        logger.Warnf("zk consumer register has quit, so zk 
event listener exit now.")
                        return nil, perrors.New("listener stopped")
 
@@ -111,11 +121,13 @@ func (l *RegistryConfigurationListener) Next() 
(*registry.ServiceEvent, error) {
                }
        }
 }
+
+// Close ...
 func (l *RegistryConfigurationListener) Close() {
        // ensure that the listener will be closed at most once.
        l.closeOnce.Do(func() {
                l.isClosed = true
-               l.registry.wg.Done()
+               l.registry.WaitGroup().Done()
        })
 }
 
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 24c4158..f4e53dc 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -18,20 +18,16 @@
 package zookeeper
 
 import (
-       "context"
        "fmt"
        "net/url"
-       "os"
-       "strconv"
        "strings"
        "sync"
        "time"
 )
 
 import (
-       gxnet "github.com/dubbogo/gost/net"
+       "github.com/dubbogo/go-zookeeper/zk"
        perrors "github.com/pkg/errors"
-       "github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -44,20 +40,11 @@ import (
 )
 
 const (
-       RegistryZkClient  = "zk registry"
-       RegistryConnDelay = 3
-       MaxWaitInterval   = time.Duration(3e9)
-)
-
-var (
-       processID = ""
-       localIP   = ""
+       // RegistryZkClient zk client name
+       RegistryZkClient = "zk registry"
 )
 
 func init() {
-       processID = fmt.Sprintf("%d", os.Getpid())
-       localIP, _ = gxnet.GetLocalIP()
-       //plugins.PluggableRegistries["zookeeper"] = newZkRegistry
        extension.SetRegistry("zookeeper", newZkRegistry)
 }
 
@@ -66,20 +53,13 @@ func init() {
 /////////////////////////////////////
 
 type zkRegistry struct {
-       context context.Context
-       *common.URL
-       birth int64          // time of file birth, seconds since Epoch; 0 if 
unknown
-       wg    sync.WaitGroup // wg+done for zk restart
-       done  chan struct{}
-
-       cltLock  sync.Mutex
-       client   *zookeeper.ZookeeperClient
-       services map[string]common.URL // service name + protocol -> service 
config
-
+       registry.BaseRegistry
+       client         *zookeeper.ZookeeperClient
        listenerLock   sync.Mutex
        listener       *zookeeper.ZkEventListener
        dataListener   *RegistryDataListener
        configListener *RegistryConfigurationListener
+       cltLock        sync.Mutex
        //for provider
        zkPath map[string]int // key = protocol://ip:port/interface
 }
@@ -89,21 +69,17 @@ func newZkRegistry(url *common.URL) (registry.Registry, 
error) {
                err error
                r   *zkRegistry
        )
-
        r = &zkRegistry{
-               URL:      url,
-               birth:    time.Now().UnixNano(),
-               done:     make(chan struct{}),
-               services: make(map[string]common.URL),
-               zkPath:   make(map[string]int),
+               zkPath: make(map[string]int),
        }
+       r.InitBaseRegistry(url, r)
 
        err = zookeeper.ValidateZookeeperClient(r, 
zookeeper.WithZkName(RegistryZkClient))
        if err != nil {
                return nil, err
        }
+       r.WaitGroup().Add(1) //zk client start successful, then wg +1
 
-       r.wg.Add(1)
        go zookeeper.HandleClientRestart(r)
 
        r.listener = zookeeper.NewZkEventListener(r.client)
@@ -113,10 +89,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, 
error) {
        return r, nil
 }
 
+// Options ...
 type Options struct {
        client *zookeeper.ZookeeperClient
 }
 
+// Option ...
 type Option func(*Options)
 
 func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) 
(*zk.TestCluster, *zkRegistry, error) {
@@ -128,27 +106,41 @@ func newMockZkRegistry(url *common.URL, opts 
...zookeeper.Option) (*zk.TestClust
        )
 
        r = &zkRegistry{
-               URL:      url,
-               birth:    time.Now().UnixNano(),
-               done:     make(chan struct{}),
-               services: make(map[string]common.URL),
-               zkPath:   make(map[string]int),
+               zkPath: make(map[string]int),
        }
-
+       r.InitBaseRegistry(url, r)
        c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 
15*time.Second, opts...)
        if err != nil {
                return nil, nil, err
        }
-       r.wg.Add(1)
+       r.WaitGroup().Add(1) //zk client start successful, then wg +1
        go zookeeper.HandleClientRestart(r)
+       r.InitListeners()
+       return c, r, nil
+}
 
+func (r *zkRegistry) InitListeners() {
        r.listener = zookeeper.NewZkEventListener(r.client)
        r.configListener = NewRegistryConfigurationListener(r.client, r)
        r.dataListener = NewRegistryDataListener(r.configListener)
+}
 
-       return c, r, nil
+func (r *zkRegistry) CreatePath(path string) error {
+       return r.ZkClient().Create(path)
+}
+
+func (r *zkRegistry) DoRegister(root string, node string) error {
+       return r.registerTempZookeeperNode(root, node)
+}
+
+func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
+       return r.getListener(conf)
 }
 
+func (r *zkRegistry) CloseAndNilClient() {
+       r.client.Close()
+       r.client = nil
+}
 func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
        return r.client
 }
@@ -161,222 +153,10 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex {
        return &r.cltLock
 }
 
-func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
-       return &r.wg
-}
-
-func (r *zkRegistry) GetDone() chan struct{} {
-       return r.done
-}
-
-func (r *zkRegistry) GetUrl() common.URL {
-       return *r.URL
-}
-
-func (r *zkRegistry) Destroy() {
+func (r *zkRegistry) CloseListener() {
        if r.configListener != nil {
                r.configListener.Close()
        }
-       close(r.done)
-       r.wg.Wait()
-       r.closeRegisters()
-}
-
-func (r *zkRegistry) RestartCallBack() bool {
-
-       // copy r.services
-       services := []common.URL{}
-       for _, confIf := range r.services {
-               services = append(services, confIf)
-       }
-
-       flag := true
-       for _, confIf := range services {
-               err := r.register(confIf)
-               if err != nil {
-                       logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) 
= error{%#v}",
-                               confIf, perrors.WithStack(err))
-                       flag = false
-                       break
-               }
-               logger.Infof("success to re-register service :%v", confIf.Key())
-       }
-       r.listener = zookeeper.NewZkEventListener(r.client)
-       r.configListener = NewRegistryConfigurationListener(r.client, r)
-       r.dataListener = NewRegistryDataListener(r.configListener)
-
-       return flag
-}
-
-func (r *zkRegistry) Register(conf common.URL) error {
-       var (
-               ok  bool
-               err error
-       )
-       role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-       switch role {
-       case common.CONSUMER:
-               r.cltLock.Lock()
-               _, ok = r.services[conf.Key()]
-               r.cltLock.Unlock()
-               if ok {
-                       return perrors.Errorf("Path{%s} has been registered", 
conf.Path)
-               }
-
-               err = r.register(conf)
-               if err != nil {
-                       return perrors.WithStack(err)
-               }
-
-               r.cltLock.Lock()
-               r.services[conf.Key()] = conf
-               r.cltLock.Unlock()
-               
logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
-
-       case common.PROVIDER:
-
-               // Check if the service has been registered
-               r.cltLock.Lock()
-               // Note the difference between consumer and 
consumerZookeeperRegistry (consumer use conf.Path).
-               // Because the consumer wants to provide monitoring functions 
for the selector,
-               // the provider allows multiple groups or versions of the same 
service to be registered.
-               _, ok = r.services[conf.Key()]
-               r.cltLock.Unlock()
-               if ok {
-                       return perrors.Errorf("Path{%s} has been registered", 
conf.Key())
-               }
-
-               err = r.register(conf)
-               if err != nil {
-                       return perrors.WithMessagef(err, "register(conf:%+v)", 
conf)
-               }
-
-               r.cltLock.Lock()
-               r.services[conf.Key()] = conf
-               r.cltLock.Unlock()
-
-               logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
-       }
-
-       return nil
-}
-
-func (r *zkRegistry) service(c common.URL) string {
-       return url.QueryEscape(c.Service())
-}
-
-func (r *zkRegistry) register(c common.URL) error {
-       var (
-               err error
-               //revision   string
-               params     url.Values
-               rawURL     string
-               encodedURL string
-               dubboPath  string
-               //conf       config.URL
-       )
-
-       err = zookeeper.ValidateZookeeperClient(r, 
zookeeper.WithZkName(RegistryZkClient))
-       if err != nil {
-               return perrors.WithStack(err)
-       }
-       params = url.Values{}
-
-       c.RangeParams(func(key, value string) bool {
-               params.Add(key, value)
-               return true
-       })
-
-       params.Add("pid", processID)
-       params.Add("ip", localIP)
-       //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
-
-       role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-       switch role {
-
-       case common.PROVIDER:
-
-               if c.Path == "" || len(c.Methods) == 0 {
-                       return perrors.Errorf("conf{Path:%s, Methods:%s}", 
c.Path, c.Methods)
-               }
-               // 先创建服务下面的provider node
-               dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.PROVIDER])
-               r.cltLock.Lock()
-               err = r.client.Create(dubboPath)
-               r.cltLock.Unlock()
-               if err != nil {
-                       logger.Errorf("zkClient.create(path{%s}) = error{%#v}", 
dubboPath, perrors.WithStack(err))
-                       return perrors.WithMessagef(err, 
"zkclient.Create(path:%s)", dubboPath)
-               }
-               params.Add("anyhost", "true")
-
-               // Dubbo java consumer to start looking for the provider 
url,because the category does not match,
-               // the provider will not find, causing the consumer can not 
start, so we use consumers.
-               // DubboRole               = [...]string{"consumer", "", "", 
"provider"}
-               // params.Add("category", (RoleType(PROVIDER)).Role())
-               params.Add("category", 
(common.RoleType(common.PROVIDER)).String())
-               params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
-
-               params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
-               if len(c.Methods) == 0 {
-                       params.Add("methods", strings.Join(c.Methods, ","))
-               }
-               logger.Debugf("provider zk url params:%#v", params)
-               var host string
-               if c.Ip == "" {
-                       host = localIP + ":" + c.Port
-               } else {
-                       host = c.Ip + ":" + c.Port
-               }
-
-               rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, 
params.Encode())
-               encodedURL = url.QueryEscape(rawURL)
-
-               // Print your own registration service providers.
-               dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
(common.RoleType(common.PROVIDER)).String())
-               logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
-
-       case common.CONSUMER:
-               dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.CONSUMER])
-               r.cltLock.Lock()
-               err = r.client.Create(dubboPath)
-               r.cltLock.Unlock()
-               if err != nil {
-                       logger.Errorf("zkClient.create(path{%s}) = error{%v}", 
dubboPath, perrors.WithStack(err))
-                       return perrors.WithStack(err)
-               }
-               dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
common.DubboNodes[common.PROVIDER])
-               r.cltLock.Lock()
-               err = r.client.Create(dubboPath)
-               r.cltLock.Unlock()
-               if err != nil {
-                       logger.Errorf("zkClient.create(path{%s}) = error{%v}", 
dubboPath, perrors.WithStack(err))
-                       return perrors.WithStack(err)
-               }
-
-               params.Add("protocol", c.Protocol)
-
-               params.Add("category", 
(common.RoleType(common.CONSUMER)).String())
-               params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
-               rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, 
params.Encode())
-               encodedURL = url.QueryEscape(rawURL)
-
-               dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), 
(common.RoleType(common.CONSUMER)).String())
-               logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
-
-       default:
-               return perrors.Errorf("@c{%v} type is not referencer or 
provider", c)
-       }
-
-       dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
-       err = r.registerTempZookeeperNode(dubboPath, encodedURL)
-
-       if err != nil {
-               return perrors.WithMessagef(err, 
"registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
-       }
-       return nil
 }
 
 func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error 
{
@@ -406,53 +186,6 @@ func (r *zkRegistry) registerTempZookeeperNode(root 
string, node string) error {
        return nil
 }
 
-func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
-       return r.getListener(conf)
-}
-func sleepWait(n int) {
-       wait := time.Duration((n + 1) * 2e8)
-       if wait > MaxWaitInterval {
-               wait = MaxWaitInterval
-       }
-       time.Sleep(wait)
-}
-
-//subscribe from registry
-func (r *zkRegistry) Subscribe(url *common.URL, notifyListener 
registry.NotifyListener) {
-       n := 0
-       for {
-               n++
-               if !r.IsAvailable() {
-                       logger.Warnf("event listener game over.")
-                       return
-               }
-
-               listener, err := r.subscribe(url)
-               if err != nil {
-                       if !r.IsAvailable() {
-                               logger.Warnf("event listener game over.")
-                               return
-                       }
-                       logger.Warnf("getListener() = err:%v", 
perrors.WithStack(err))
-                       time.Sleep(time.Duration(RegistryConnDelay) * 
time.Second)
-                       continue
-               }
-
-               for {
-                       if serviceEvent, err := listener.Next(); err != nil {
-                               logger.Warnf("Selector.watch() = error{%v}", 
perrors.WithStack(err))
-                               listener.Close()
-                               break
-                       } else {
-                               logger.Infof("update begin, service event: %v", 
serviceEvent.String())
-                               notifyListener.Notify(serviceEvent)
-                       }
-
-               }
-               sleepWait(n)
-       }
-}
-
 func (r *zkRegistry) getListener(conf *common.URL) 
(*RegistryConfigurationListener, error) {
        var (
                zkListener *RegistryConfigurationListener
@@ -489,22 +222,3 @@ func (r *zkRegistry) getListener(conf *common.URL) 
(*RegistryConfigurationListen
 
        return zkListener, nil
 }
-
-func (r *zkRegistry) closeRegisters() {
-       r.cltLock.Lock()
-       defer r.cltLock.Unlock()
-       logger.Infof("begin to close provider zk client")
-       // Close the old client first to close the tmp node.
-       r.client.Close()
-       r.client = nil
-       r.services = nil
-}
-
-func (r *zkRegistry) IsAvailable() bool {
-       select {
-       case <-r.done:
-               return false
-       default:
-               return true
-       }
-}
diff --git a/registry/zookeeper/registry_test.go 
b/registry/zookeeper/registry_test.go
index 2c7bb90..5e5189c 100644
--- a/registry/zookeeper/registry_test.go
+++ b/registry/zookeeper/registry_test.go
@@ -64,7 +64,7 @@ func Test_Subscribe(t *testing.T) {
        _, reg2, _ := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
 
        reg2.Register(url)
-       listener, _ := reg2.subscribe(&url)
+       listener, _ := reg2.DoSubscribe(&url)
 
        serviceEvent, _ := listener.Next()
        assert.NoError(t, err)
@@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) {
        assert.NoError(t, err)
        err = reg.Register(url)
        assert.NoError(t, err)
-       _, err = reg.subscribe(&url)
+       _, err = reg.DoSubscribe(&url)
        assert.NoError(t, err)
 
        //listener.Close()
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 0509685..ba3ea6e 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -36,16 +36,22 @@ import (
 )
 
 const (
-       ConnDelay            = 3
-       MaxFailTimes         = 15
+       // ConnDelay connection dalay
+       ConnDelay = 3
+       // MaxFailTimes max failure times
+       MaxFailTimes = 15
+       // RegistryETCDV3Client client name
        RegistryETCDV3Client = "etcd registry"
 )
 
 var (
+       // ErrNilETCDV3Client ...
        ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full 
describe the ERR
-       ErrKVPairNotFound  = perrors.New("k/v pair not found")
+       // ErrKVPairNotFound ...
+       ErrKVPairNotFound = perrors.New("k/v pair not found")
 )
 
+// Options ...
 type Options struct {
        name      string
        endpoints []string
@@ -54,30 +60,38 @@ type Options struct {
        heartbeat int // heartbeat second
 }
 
+// Option ...
 type Option func(*Options)
 
+// WithEndpoints ...
 func WithEndpoints(endpoints ...string) Option {
        return func(opt *Options) {
                opt.endpoints = endpoints
        }
 }
+
+// WithName ...
 func WithName(name string) Option {
        return func(opt *Options) {
                opt.name = name
        }
 }
+
+// WithTimeout ...
 func WithTimeout(timeout time.Duration) Option {
        return func(opt *Options) {
                opt.timeout = timeout
        }
 }
 
+// WithHeartbeat ...
 func WithHeartbeat(heartbeat int) Option {
        return func(opt *Options) {
                opt.heartbeat = heartbeat
        }
 }
 
+// ValidateClient ...
 func ValidateClient(container clientFacade, opts ...Option) error {
 
        options := &Options{
@@ -117,6 +131,7 @@ func ValidateClient(container clientFacade, opts ...Option) 
error {
        return nil
 }
 
+// Client ...
 type Client struct {
        lock sync.RWMutex
 
@@ -191,6 +206,7 @@ func (c *Client) stop() bool {
        return false
 }
 
+// Close ...
 func (c *Client) Close() {
 
        if c == nil {
@@ -309,6 +325,7 @@ func (c *Client) get(k string) (string, error) {
        return string(resp.Kvs[0].Value), nil
 }
 
+// CleanKV ...
 func (c *Client) CleanKV() error {
 
        c.lock.RLock()
@@ -408,10 +425,12 @@ func (c *Client) keepAliveKV(k string, v string) error {
        return nil
 }
 
+// Done ...
 func (c *Client) Done() <-chan struct{} {
        return c.exit
 }
 
+// Valid ...
 func (c *Client) Valid() bool {
        select {
        case <-c.exit:
@@ -428,6 +447,7 @@ func (c *Client) Valid() bool {
        return true
 }
 
+// Create ...
 func (c *Client) Create(k string, v string) error {
 
        err := c.put(k, v)
@@ -437,6 +457,7 @@ func (c *Client) Create(k string, v string) error {
        return nil
 }
 
+// Delete ...
 func (c *Client) Delete(k string) error {
 
        err := c.delete(k)
@@ -447,6 +468,7 @@ func (c *Client) Delete(k string) error {
        return nil
 }
 
+// RegisterTemp ...
 func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
 
        completeKey := path.Join(basePath, node)
@@ -459,6 +481,7 @@ func (c *Client) RegisterTemp(basePath string, node string) 
(string, error) {
        return completeKey, nil
 }
 
+// GetChildrenKVList ...
 func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
 
        kList, vList, err := c.getChildren(k)
@@ -468,6 +491,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, 
[]string, error) {
        return kList, vList, nil
 }
 
+// Get ...
 func (c *Client) Get(k string) (string, error) {
 
        v, err := c.get(k)
@@ -478,6 +502,7 @@ func (c *Client) Get(k string) (string, error) {
        return v, nil
 }
 
+// Watch ...
 func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
 
        wc, err := c.watch(k)
@@ -487,6 +512,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, 
error) {
        return wc, nil
 }
 
+// WatchWithPrefix ...
 func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
 
        wc, err := c.watchWithPrefix(prefix)
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index 499044b..35befc8 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -38,11 +38,12 @@ type clientFacade interface {
        SetClient(*Client)
        ClientLock() *sync.Mutex
        WaitGroup() *sync.WaitGroup //for wait group control, etcd client 
listener & etcd client container
-       GetDone() chan struct{}     //for etcd client control
+       Done() chan struct{}        //for etcd client control
        RestartCallBack() bool
        common.Node
 }
 
+// HandleClientRestart ...
 func HandleClientRestart(r clientFacade) {
 
        var (
@@ -54,7 +55,7 @@ func HandleClientRestart(r clientFacade) {
 LOOP:
        for {
                select {
-               case <-r.GetDone():
+               case <-r.Done():
                        logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 
goroutine exit now...")
                        break LOOP
                        // re-register all services
@@ -71,7 +72,7 @@ LOOP:
                        failTimes = 0
                        for {
                                select {
-                               case <-r.GetDone():
+                               case <-r.Done():
                                        
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit 
now...")
                                        break LOOP
                                case 
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 
avoid connect frequent
diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go
index a4d5805..a51a68b 100644
--- a/remoting/etcdv3/listener.go
+++ b/remoting/etcdv3/listener.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/dubbo-go/remoting"
 )
 
+// EventListener ...
 type EventListener struct {
        client     *Client
        keyMapLock sync.Mutex
@@ -40,6 +41,7 @@ type EventListener struct {
        wg         sync.WaitGroup
 }
 
+// NewEventListener ...
 func NewEventListener(client *Client) *EventListener {
        return &EventListener{
                client: client,
@@ -47,7 +49,7 @@ func NewEventListener(client *Client) *EventListener {
        }
 }
 
-// Listen on a spec key
+// ListenServiceNodeEvent Listen on a spec key
 // this method will return true when spec key deleted,
 // this method will return false when deep layer connection lose
 func (l *EventListener) ListenServiceNodeEvent(key string, listener 
...remoting.DataListener) bool {
@@ -134,7 +136,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, 
listeners ...remotin
        panic("unreachable")
 }
 
-// Listen on a set of key with spec prefix
+// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
 func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, 
listener ...remoting.DataListener) {
 
        l.wg.Add(1)
@@ -180,7 +182,7 @@ func timeSecondDuration(sec int) time.Duration {
        return time.Duration(sec) * time.Second
 }
 
-// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 
ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
+// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 
ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
 // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> 
ListenServiceNodeEvent
 //                            |
 //                            --------> ListenServiceNodeEvent
@@ -229,6 +231,7 @@ func (l *EventListener) ListenServiceEvent(key string, 
listener remoting.DataLis
        }(key)
 }
 
+// Close ...
 func (l *EventListener) Close() {
        l.wg.Wait()
 }
diff --git a/remoting/listener.go b/remoting/listener.go
index 8d1e357..3713ba0 100644
--- a/remoting/listener.go
+++ b/remoting/listener.go
@@ -21,6 +21,7 @@ import (
        "fmt"
 )
 
+// DataListener ...
 type DataListener interface {
        DataChange(eventType Event) bool //bool is return for interface 
implement is interesting
 }
@@ -29,11 +30,15 @@ type DataListener interface {
 // event type
 //////////////////////////////////////////
 
+// EventType ...
 type EventType int
 
 const (
+       // EventTypeAdd ...
        EventTypeAdd = iota
+       // EventTypeDel ...
        EventTypeDel
+       // EventTypeUpdate ...
        EventTypeUpdate
 )
 
@@ -51,6 +56,7 @@ func (t EventType) String() string {
 // service event
 //////////////////////////////////////////
 
+// Event ...
 type Event struct {
        Path    string
        Action  EventType
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 19d6529..f95231b 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -25,8 +25,8 @@ import (
 )
 
 import (
+       "github.com/dubbogo/go-zookeeper/zk"
        perrors "github.com/pkg/errors"
-       "github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -35,14 +35,19 @@ import (
 )
 
 const (
-       ConnDelay    = 3
+       // ConnDelay connection delay interval
+       ConnDelay = 3
+       // MaxFailTimes max fail times
        MaxFailTimes = 15
 )
 
 var (
        errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
+       errNilChildren     = perrors.Errorf("has none children")
+       errNilNode         = perrors.Errorf("node does not exist")
 )
 
+// ZookeeperClient ...
 type ZookeeperClient struct {
        name          string
        ZkAddrs       []string
@@ -54,6 +59,7 @@ type ZookeeperClient struct {
        eventRegistry map[string][]*chan struct{}
 }
 
+// StateToString ...
 func StateToString(state zk.State) string {
        switch state {
        case zk.StateDisconnected:
@@ -85,6 +91,7 @@ func StateToString(state zk.State) string {
        return "zookeeper unknown state"
 }
 
+// Options ...
 type Options struct {
        zkName string
        client *ZookeeperClient
@@ -92,14 +99,17 @@ type Options struct {
        ts *zk.TestCluster
 }
 
+// Option ...
 type Option func(*Options)
 
+// WithZkName ...
 func WithZkName(name string) Option {
        return func(opt *Options) {
                opt.zkName = name
        }
 }
 
+// ValidateZookeeperClient ...
 func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
        var (
                err error
@@ -173,12 +183,14 @@ func newZookeeperClient(name string, zkAddrs []string, 
timeout time.Duration) (*
        return z, nil
 }
 
+// WithTestCluster ...
 func WithTestCluster(ts *zk.TestCluster) Option {
        return func(opt *Options) {
                opt.ts = ts
        }
 }
 
+// NewMockZookeeperClient ...
 func NewMockZookeeperClient(name string, timeout time.Duration, opts 
...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
        var (
                err   error
@@ -224,6 +236,7 @@ func NewMockZookeeperClient(name string, timeout 
time.Duration, opts ...Option)
        return ts, z, event, nil
 }
 
+// HandleZkEvent ...
 func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
        var (
                state int
@@ -248,11 +261,13 @@ LOOP:
                                logger.Warnf("zk{addr:%s} state is 
StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
                                z.stop()
                                z.Lock()
-                               if z.Conn != nil {
-                                       z.Conn.Close()
-                                       z.Conn = nil
-                               }
+                               conn := z.Conn
+                               z.Conn = nil
                                z.Unlock()
+                               if conn != nil {
+                                       conn.Close()
+                               }
+
                                break LOOP
                        case (int)(zk.EventNodeDataChanged), 
(int)(zk.EventNodeChildrenChanged):
                                logger.Infof("zkClient{%s} get zk node changed 
event{path:%s}", z.name, event.Path)
@@ -282,6 +297,7 @@ LOOP:
        }
 }
 
+// RegisterEvent ...
 func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
        if zkPath == "" || event == nil {
                return
@@ -296,6 +312,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, 
event *chan struct{}) {
        z.Unlock()
 }
 
+// UnregisterEvent ...
 func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) 
{
        if zkPath == "" {
                return
@@ -322,6 +339,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, 
event *chan struct{}) {
        }
 }
 
+// Done ...
 func (z *ZookeeperClient) Done() <-chan struct{} {
        return z.exit
 }
@@ -337,6 +355,7 @@ func (z *ZookeeperClient) stop() bool {
        return false
 }
 
+// ZkConnValid ...
 func (z *ZookeeperClient) ZkConnValid() bool {
        select {
        case <-z.exit:
@@ -354,6 +373,7 @@ func (z *ZookeeperClient) ZkConnValid() bool {
        return valid
 }
 
+// Close ...
 func (z *ZookeeperClient) Close() {
        if z == nil {
                return
@@ -362,14 +382,17 @@ func (z *ZookeeperClient) Close() {
        z.stop()
        z.Wait.Wait()
        z.Lock()
-       if z.Conn != nil {
-               z.Conn.Close()
-               z.Conn = nil
-       }
+       conn := z.Conn
+       z.Conn = nil
        z.Unlock()
+       if conn != nil {
+               conn.Close()
+       }
+
        logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, 
z.ZkAddrs)
 }
 
+// Create ...
 func (z *ZookeeperClient) Create(basePath string) error {
        var (
                err     error
@@ -381,10 +404,12 @@ func (z *ZookeeperClient) Create(basePath string) error {
                tmpPath = path.Join(tmpPath, "/", str)
                err = errNilZkClientConn
                z.Lock()
-               if z.Conn != nil {
-                       _, err = z.Conn.Create(tmpPath, []byte(""), 0, 
zk.WorldACL(zk.PermAll))
-               }
+               conn := z.Conn
                z.Unlock()
+               if conn != nil {
+                       _, err = conn.Create(tmpPath, []byte(""), 0, 
zk.WorldACL(zk.PermAll))
+               }
+
                if err != nil {
                        if err == zk.ErrNodeExists {
                                logger.Infof("zk.create(\"%s\") exists\n", 
tmpPath)
@@ -398,6 +423,7 @@ func (z *ZookeeperClient) Create(basePath string) error {
        return nil
 }
 
+// Delete ...
 func (z *ZookeeperClient) Delete(basePath string) error {
        var (
                err error
@@ -405,14 +431,16 @@ func (z *ZookeeperClient) Delete(basePath string) error {
 
        err = errNilZkClientConn
        z.Lock()
-       if z.Conn != nil {
-               err = z.Conn.Delete(basePath, -1)
-       }
+       conn := z.Conn
        z.Unlock()
+       if conn != nil {
+               err = conn.Delete(basePath, -1)
+       }
 
        return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
 }
 
+// RegisterTemp ...
 func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, 
error) {
        var (
                err     error
@@ -425,10 +453,12 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, 
node string) (string, er
        data = []byte("")
        zkPath = path.Join(basePath) + "/" + node
        z.Lock()
-       if z.Conn != nil {
-               tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, 
zk.WorldACL(zk.PermAll))
-       }
+       conn := z.Conn
        z.Unlock()
+       if conn != nil {
+               tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, 
zk.WorldACL(zk.PermAll))
+       }
+
        //if err != nil && err != zk.ErrNodeExists {
        if err != nil {
                logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = 
error(%v)\n", zkPath, perrors.WithStack(err))
@@ -439,6 +469,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, 
node string) (string, er
        return tmpPath, nil
 }
 
+// RegisterTempSeq ...
 func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) 
(string, error) {
        var (
                err     error
@@ -447,15 +478,17 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath 
string, data []byte) (string,
 
        err = errNilZkClientConn
        z.Lock()
-       if z.Conn != nil {
-               tmpPath, err = z.Conn.Create(
+       conn := z.Conn
+       z.Unlock()
+       if conn != nil {
+               tmpPath, err = conn.Create(
                        path.Join(basePath)+"/",
                        data,
                        zk.FlagEphemeral|zk.FlagSequence,
                        zk.WorldACL(zk.PermAll),
                )
        }
-       z.Unlock()
+
        logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = 
tempPath{%s}", basePath, tmpPath)
        if err != nil && err != zk.ErrNodeExists {
                logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", 
zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
@@ -467,37 +500,44 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath 
string, data []byte) (string,
        return tmpPath, nil
 }
 
+// GetChildrenW ...
 func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan 
zk.Event, error) {
        var (
                err      error
                children []string
                stat     *zk.Stat
-               event    <-chan zk.Event
+               watcher  *zk.Watcher
        )
 
        err = errNilZkClientConn
        z.Lock()
-       if z.Conn != nil {
-               children, stat, event, err = z.Conn.ChildrenW(path)
-       }
+       conn := z.Conn
        z.Unlock()
+       if conn != nil {
+               children, stat, watcher, err = conn.ChildrenW(path)
+       }
+
        if err != nil {
+               if err == zk.ErrNoChildrenForEphemerals {
+                       return nil, nil, errNilChildren
+               }
                if err == zk.ErrNoNode {
-                       return nil, nil, perrors.Errorf("path{%s} has none 
children", path)
+                       return nil, nil, errNilNode
                }
                logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
                return nil, nil, perrors.WithMessagef(err, 
"zk.ChildrenW(path:%s)", path)
        }
        if stat == nil {
-               return nil, nil, perrors.Errorf("path{%s} has none children", 
path)
+               return nil, nil, perrors.Errorf("path{%s} get stat is nil", 
path)
        }
        if len(children) == 0 {
-               return nil, nil, perrors.Errorf("path{%s} has none children", 
path)
+               return nil, nil, errNilChildren
        }
 
-       return children, event, nil
+       return children, watcher.EvtCh, nil
 }
 
+// GetChildren ...
 func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
        var (
                err      error
@@ -507,10 +547,12 @@ func (z *ZookeeperClient) GetChildren(path string) 
([]string, error) {
 
        err = errNilZkClientConn
        z.Lock()
-       if z.Conn != nil {
-               children, stat, err = z.Conn.Children(path)
-       }
+       conn := z.Conn
        z.Unlock()
+       if conn != nil {
+               children, stat, err = conn.Children(path)
+       }
+
        if err != nil {
                if err == zk.ErrNoNode {
                        return nil, perrors.Errorf("path{%s} has none 
children", path)
@@ -522,25 +564,28 @@ func (z *ZookeeperClient) GetChildren(path string) 
([]string, error) {
                return nil, perrors.Errorf("path{%s} has none children", path)
        }
        if len(children) == 0 {
-               return nil, perrors.Errorf("path{%s} has none children", path)
+               return nil, errNilChildren
        }
 
        return children, nil
 }
 
+// ExistW ...
 func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
        var (
-               exist bool
-               err   error
-               event <-chan zk.Event
+               exist   bool
+               err     error
+               watcher *zk.Watcher
        )
 
        err = errNilZkClientConn
        z.Lock()
-       if z.Conn != nil {
-               exist, _, event, err = z.Conn.ExistsW(zkPath)
-       }
+       conn := z.Conn
        z.Unlock()
+       if conn != nil {
+               exist, _, watcher, err = conn.ExistsW(zkPath)
+       }
+
        if err != nil {
                logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", 
z.name, zkPath, perrors.WithStack(err))
                return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", 
zkPath)
@@ -550,9 +595,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan 
zk.Event, error) {
                return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does 
not exist.", z.name, zkPath)
        }
 
-       return event, nil
+       return watcher.EvtCh, nil
 }
 
+// GetContent ...
 func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
        return z.Conn.Get(zkPath)
 }
diff --git a/remoting/zookeeper/client_test.go 
b/remoting/zookeeper/client_test.go
index f1bd0c2..cb41eb3 100644
--- a/remoting/zookeeper/client_test.go
+++ b/remoting/zookeeper/client_test.go
@@ -24,7 +24,7 @@ import (
 )
 
 import (
-       "github.com/samuel/go-zookeeper/zk"
+       "github.com/dubbogo/go-zookeeper/zk"
        "github.com/stretchr/testify/assert"
 )
 
@@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
        states := []zk.State{zk.StateConnecting, zk.StateConnected, 
zk.StateHasSession}
        verifyEventStateOrder(t, event, states, "event channel")
 }
+
+func Test_UnregisterEvent(t *testing.T) {
+       client := &ZookeeperClient{}
+       client.eventRegistry = make(map[string][]*chan struct{})
+       array := []*chan struct{}{}
+       array = append(array, new(chan struct{}))
+       client.eventRegistry["test"] = array
+       client.UnregisterEvent("test", new(chan struct{}))
+}
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index cdc7ead..055db4f 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -35,11 +35,12 @@ type zkClientFacade interface {
        SetZkClient(*ZookeeperClient)
        ZkClientLock() *sync.Mutex
        WaitGroup() *sync.WaitGroup //for wait group control, zk client 
listener & zk client container
-       GetDone() chan struct{}     //for zk client control
+       Done() chan struct{}        //for zk client control
        RestartCallBack() bool
        common.Node
 }
 
+// HandleClientRestart ...
 func HandleClientRestart(r zkClientFacade) {
        var (
                err error
@@ -51,7 +52,7 @@ func HandleClientRestart(r zkClientFacade) {
 LOOP:
        for {
                select {
-               case <-r.GetDone():
+               case <-r.Done():
                        logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry 
goroutine exit now...")
                        break LOOP
                        // re-register all services
@@ -67,7 +68,7 @@ LOOP:
                        failTimes = 0
                        for {
                                select {
-                               case <-r.GetDone():
+                               case <-r.Done():
                                        
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
                                        break LOOP
                                case 
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // 
Prevent crazy reconnection zk.
diff --git a/remoting/zookeeper/facade_test.go 
b/remoting/zookeeper/facade_test.go
index 58e0d69..175d758 100644
--- a/remoting/zookeeper/facade_test.go
+++ b/remoting/zookeeper/facade_test.go
@@ -24,7 +24,7 @@ import (
        "time"
 )
 import (
-       "github.com/samuel/go-zookeeper/zk"
+       "github.com/dubbogo/go-zookeeper/zk"
        "github.com/stretchr/testify/assert"
 )
 import (
@@ -55,7 +55,7 @@ func (r *mockFacade) WaitGroup() *sync.WaitGroup {
        return &r.wg
 }
 
-func (r *mockFacade) GetDone() chan struct{} {
+func (r *mockFacade) Done() chan struct{} {
        return r.done
 }
 
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 0b9db5e..4493c06 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -19,15 +19,14 @@ package zookeeper
 
 import (
        "path"
-       "strings"
        "sync"
        "time"
 )
 
 import (
        "github.com/dubbogo/getty"
+       "github.com/dubbogo/go-zookeeper/zk"
        perrors "github.com/pkg/errors"
-       "github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -35,6 +34,7 @@ import (
        "github.com/apache/dubbo-go/remoting"
 )
 
+// ZkEventListener ...
 type ZkEventListener struct {
        client      *ZookeeperClient
        pathMapLock sync.Mutex
@@ -42,6 +42,7 @@ type ZkEventListener struct {
        wg          sync.WaitGroup
 }
 
+// NewZkEventListener ...
 func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
        return &ZkEventListener{
                client:  client,
@@ -49,10 +50,12 @@ func NewZkEventListener(client *ZookeeperClient) 
*ZkEventListener {
        }
 }
 
+// SetClient ...
 func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
        l.client = client
 }
 
+// ListenServiceNodeEvent ...
 func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener 
...remoting.DataListener) bool {
        l.wg.Add(1)
        defer l.wg.Done()
@@ -107,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
 
        newChildren, err := l.client.GetChildren(zkPath)
        if err != nil {
-               logger.Errorf("path{%s} child nodes changed, zk.Children() = 
error{%v}", zkPath, perrors.WithStack(err))
-               return
+               if err == errNilChildren {
+                       content, _, err := l.client.Conn.Get(zkPath)
+                       if err != nil {
+                               logger.Errorf("Get new node path {%v} 's 
content error,message is  {%v}", zkPath, perrors.WithStack(err))
+                       } else {
+                               listener.DataChange(remoting.Event{Path: 
zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
+                       }
+
+               } else {
+                       logger.Errorf("path{%s} child nodes changed, 
zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
+               }
        }
 
        // a node was added -- listen the new node
@@ -178,7 +190,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, 
listener remoting.DataLi
                        if MaxFailTimes <= failTimes {
                                failTimes = MaxFailTimes
                        }
-                       logger.Warnf("listenDirEvent(path{%s}) = error{%v}", 
zkPath, err)
+                       logger.Infof("listenDirEvent(path{%s}) = error{%v}", 
zkPath, err)
                        // clear the event channel
                CLEAR:
                        for {
@@ -189,6 +201,11 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, 
listener remoting.DataLi
                                }
                        }
                        l.client.RegisterEvent(zkPath, &event)
+                       if err == errNilNode {
+                               logger.Warnf("listenDirEvent(path{%s}) got 
errNilNode,so exit listen", zkPath)
+                               l.client.UnregisterEvent(zkPath, &event)
+                               return
+                       }
                        select {
                        case 
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
                                l.client.UnregisterEvent(zkPath, &event)
@@ -263,56 +280,11 @@ func timeSecondDuration(sec int) time.Duration {
        return time.Duration(sec) * time.Second
 }
 
-// this func is invoked by 
ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
+// ListenServiceEvent is invoked by 
ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
 // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> 
ListenServiceNodeEvent
 //                            |
 //                            --------> ListenServiceNodeEvent
 func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener 
remoting.DataListener) {
-       var (
-               err       error
-               dubboPath string
-               children  []string
-       )
-
-       zkPath = strings.ReplaceAll(zkPath, "$", "%24")
-       l.pathMapLock.Lock()
-       _, ok := l.pathMap[zkPath]
-       l.pathMapLock.Unlock()
-       if ok {
-               logger.Warnf("@zkPath %s has already been listened.", zkPath)
-               return
-       }
-
-       l.pathMapLock.Lock()
-       l.pathMap[zkPath] = struct{}{}
-       l.pathMapLock.Unlock()
-
-       logger.Infof("listen dubbo provider path{%s} event and wait to get all 
provider zk nodes", zkPath)
-       children, err = l.client.GetChildren(zkPath)
-       if err != nil {
-               children = nil
-               logger.Warnf("fail to get children of zk path{%s}", zkPath)
-       }
-
-       for _, c := range children {
-               // listen l service node
-               dubboPath = path.Join(zkPath, c)
-               content, _, err := l.client.Conn.Get(dubboPath)
-               if err != nil {
-                       logger.Errorf("Get new node path {%v} 's content 
error,message is  {%v}", dubboPath, perrors.WithStack(err))
-               }
-               if !listener.DataChange(remoting.Event{Path: dubboPath, Action: 
remoting.EventTypeAdd, Content: string(content)}) {
-                       continue
-               }
-               logger.Infof("listen dubbo service key{%s}", dubboPath)
-               go func(zkPath string, listener remoting.DataListener) {
-                       if l.ListenServiceNodeEvent(zkPath) {
-                               listener.DataChange(remoting.Event{Path: 
zkPath, Action: remoting.EventTypeDel})
-                       }
-                       logger.Warnf("listenSelf(zk path{%s}) goroutine exit 
now", zkPath)
-               }(dubboPath, listener)
-       }
-
        logger.Infof("listen dubbo path{%s}", zkPath)
        go func(zkPath string, listener remoting.DataListener) {
                l.listenDirEvent(zkPath, listener)
@@ -324,6 +296,7 @@ func (l *ZkEventListener) valid() bool {
        return l.client.ZkConnValid()
 }
 
+// Close ...
 func (l *ZkEventListener) Close() {
        l.wg.Wait()
 }
diff --git a/remoting/zookeeper/listener_test.go 
b/remoting/zookeeper/listener_test.go
index aa627c7..43e9aca 100644
--- a/remoting/zookeeper/listener_test.go
+++ b/remoting/zookeeper/listener_test.go
@@ -24,7 +24,7 @@ import (
        "time"
 )
 import (
-       "github.com/samuel/go-zookeeper/zk"
+       "github.com/dubbogo/go-zookeeper/zk"
        "github.com/stretchr/testify/assert"
 )
 import (
@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
        listener := NewZkEventListener(client)
        dataListener := &mockDataListener{client: client, changedData: 
changedData, wait: &wait}
        listener.ListenServiceEvent("/dubbo", dataListener)
-
+       time.Sleep(1 * time.Second)
        _, err := client.Conn.Set("/dubbo/dubbo.properties", 
[]byte(changedData), 1)
        assert.NoError(t, err)
        wait.Wait()
        assert.Equal(t, changedData, dataListener.eventList[1].Content)
-       client.Close()
 
 }
 

Reply via email to