This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4833a6f  Fix:Lock bug
     new 569562c  Merge pull request #106 from fangyincheng/services
4833a6f is described below

commit 4833a6f483e8c63ec343f89307dac82c28c4caad
Author: fangyincheng <fangyinch...@sina.com>
AuthorDate: Thu Jun 27 17:42:03 2019 +0800

    Fix:Lock bug
---
 protocol/dubbo/client.go     | 24 ++++++------------------
 protocol/dubbo/codec.go      |  6 +++---
 protocol/dubbo/readwriter.go |  2 --
 3 files changed, 9 insertions(+), 23 deletions(-)

diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index d6155b6..72abcda 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -148,14 +148,13 @@ type Client struct {
        pool     *gettyRPCClientPool
        sequence atomic.Uint64
 
-       pendingLock      sync.RWMutex
-       pendingResponses map[SequenceType]*PendingResponse
+       pendingResponses *sync.Map
 }
 
 func NewClient() *Client {
 
        c := &Client{
-               pendingResponses: make(map[SequenceType]*PendingResponse),
+               pendingResponses: new(sync.Map),
                conf:             *clientConf,
        }
        c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, 
time.Duration(int(time.Second)*clientConf.PoolTTL))
@@ -201,13 +200,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, 
method string, args i
        return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, 
reply, callback, copts))
 }
 
-func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse {
-       c.pendingLock.RLock()
-       defer c.pendingLock.RUnlock()
-
-       return c.pendingResponses[SequenceType(seq)]
-}
-
 func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method 
string,
        args, reply interface{}, callback AsyncCallback, opts CallOptions) 
error {
 
@@ -330,20 +322,16 @@ func (c *Client) transfer(session getty.Session, pkg 
*DubboPackage,
 }
 
 func (c *Client) addPendingResponse(pr *PendingResponse) {
-       c.pendingLock.Lock()
-       defer c.pendingLock.Unlock()
-       c.pendingResponses[SequenceType(pr.seq)] = pr
+       c.pendingResponses.Store(SequenceType(pr.seq), pr)
 }
 
 func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
-       c.pendingLock.Lock()
-       defer c.pendingLock.Unlock()
        if c.pendingResponses == nil {
                return nil
        }
-       if presp, ok := c.pendingResponses[seq]; ok {
-               delete(c.pendingResponses, seq)
-               return presp
+       if presp, ok := c.pendingResponses.Load(seq); ok {
+               c.pendingResponses.Delete(seq)
+               return presp.(*PendingResponse)
        }
        return nil
 }
diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go
index 9551c2b..e9ced84 100644
--- a/protocol/dubbo/codec.go
+++ b/protocol/dubbo/codec.go
@@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts 
...interface{}) error {
                        return perrors.Errorf("opts[0] is not of type *Client")
                }
 
-               pendingRsp := 
client.GetPendingResponse(SequenceType(p.Header.ID))
-               if pendingRsp == nil {
+               pendingRsp, ok := 
client.pendingResponses.Load(SequenceType(p.Header.ID))
+               if !ok {
                        return perrors.Errorf("client.GetPendingResponse(%v) = 
nil", p.Header.ID)
                } else {
-                       p.Body = &hessian.Response{RspObj: pendingRsp.reply}
+                       p.Body = &hessian.Response{RspObj: 
pendingRsp.(*PendingResponse).reply}
                }
        }
 
diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go
index 529aa75..042b878 100644
--- a/protocol/dubbo/readwriter.go
+++ b/protocol/dubbo/readwriter.go
@@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) 
*RpcClientPackageHandler {
 }
 
 func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) 
(interface{}, int, error) {
-       p.client.pendingLock.RLock()
-       defer p.client.pendingLock.RUnlock()
        pkg := &DubboPackage{}
 
        buf := bytes.NewBuffer(data)

Reply via email to