This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8774688  [Issue:53] Fix concurrent map write (#54)
8774688 is described below

commit 87746885974efe881cedfef1db484f9f659d86b6
Author: 冉小龙 <ranxiaolong...@gmail.com>
AuthorDate: Wed Aug 14 19:11:07 2019 +0800

    [Issue:53] Fix concurrent map write (#54)
    
    * [Issue:53]Fix concurrent map write
    
    Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com>
---
 CONTRIBUTING.md                   | 47 +++++++++-------------
 pulsar/consumer_test.go           | 82 ++++++++++++++++++++++++++++++++++++++-
 pulsar/impl_partition_consumer.go |  1 +
 pulsar/internal/connection.go     |  2 +-
 4 files changed, 102 insertions(+), 30 deletions(-)

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 7a6e414..d05c687 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -21,23 +21,15 @@
 
 # How to contribute
 
-If you would like to contribute code to this project you can do so through 
GitHub by forking the repository and sending a pull request.
+If you would like to contribute code to this project, fork the repository and 
send a pull request.
 
-This document outlines some of the conventions on development workflow, commit 
message formatting, contact points and other resources to make it easier to get 
your contribution accepted.
+## Prerequisite
 
-## Steps to Contribute
+If you have not installed Go, install it according to the [installation 
instruction](http://golang.org/doc/install).
 
-Since the `go mod` package management tool is used in this project, your go 
version is required at **Go1.11+**.
+Since the `go mod` package management tool is used in this project, **Go 1.11 
or higher** version is required.
 
-### Fork
-
-Before you start contributing, you need to fork 
[pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
-
-### Installation
-
-If you don't currently have a go environment installed,install Go according to 
the installation instructions here: http://golang.org/doc/install
-
-##### mac os && linux
+### Install Go on Mac OS and Linux
 
 ```bash
 $ mkdir -p $HOME/github.com/apache/
@@ -47,19 +39,21 @@ $ cd pulsar-client-go
 $ go mod download
 ```
 
-When you execute `go mod download`, there may be some libs that cannot be 
downloaded. You can download them by referring to the proxy provided by 
[GOPROXY.io](https://goproxy.io/).
+If some libs cannot be downloaded when you enter the `go mod download` 
command, download them by referring to the proxy provided by 
[GOPROXY.io](https://goproxy.io/).
+
+## Fork
 
-### Contribution flow
+Before contributing, you need to fork 
[pulsar-client-go](https://github.com/apache/pulsar) to your github repository.
+
+## Contribution flow
 
 ```bash
 $ git remote add apache g...@github.com:apache/pulsar-client-go.git
-
 // sync with remote master
 $ git checkout master
 $ git fetch apache
 $ git rebase apache/master
 $ git push origin master
-
 // create PR branch
 $ git checkout -b your_branch   
 # do your work, and then
@@ -68,19 +62,16 @@ $ git commit -sm "xxx"
 $ git push origin your_branch
 ```
 
-Thanks for your contributions!
-
-#### Code style
-
-The coding style suggested by the Golang community is used in Apache 
pulsar-client-go. See the [style 
doc](https://github.com/golang/go/wiki/CodeReviewComments) for details.
+## Code style
 
-Please follow this style to make your pull request easy to review, maintain 
and develop.
+The coding style suggested by the Golang community is used in Apache 
pulsar-client-go. For details, refer to [style 
doc](https://github.com/golang/go/wiki/CodeReviewComments).
+Follow the style, make your pull request easy to review, maintain and develop.
 
-#### Create new file
+## Create new files
 
-The project uses the open source protocol of Apache License 2.0. When you need 
to create a new file when developing new features, 
-please add it at the beginning of the file. The location of the header file: 
[header file](.header).
+The project uses the open source protocol of Apache License 2.0. If you need 
to create a new file when developing new features, 
+add the license at the beginning of each file. The location of the header 
file: [header file](.header).
 
-#### Updating dependencies
+## Update dependencies
 
-Apache `pulsar-client-go` uses [Go 1.11 
module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To 
add or update a dependency: use the `go mod edit` command to change the 
dependency.
+Apache `pulsar-client-go` uses [Go 1.11 
module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To 
add or update a dependency, use the `go mod edit` command to change the 
dependency.
\ No newline at end of file
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6fe86cd..7cc32ed 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
+       "github.com/apache/pulsar-client-go/util"
        "github.com/stretchr/testify/assert"
        "log"
        "net/http"
@@ -361,7 +362,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        topic := "persistent://public/default/testGetPartitions"
        testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-       makeHTTPCall(t, http.MethodPut, testURL, "5")
+       makeHTTPCall(t, http.MethodPut, testURL, "64")
 
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
@@ -635,3 +636,82 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
                })
        }
 }
+
+func TestConsumer_Shared(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/testMultiPartitionConsumerShared"
+       testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
+
+       makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+       sub := "sub-shared-1"
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sub,
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       msgList := make([]string, 0, 5)
+       for i := 0; i < 5; i++ {
+               msg, err := consumer1.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), 
string(msg.Payload()))
+               msgList = append(msgList, string(msg.Payload()))
+               if err := consumer1.Ack(msg); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       assert.Equal(t, 5, len(msgList))
+
+       for i := 0; i < 5; i++ {
+               msg, err := consumer2.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               if err := consumer2.Ack(msg); err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), 
string(msg.Payload()))
+               msgList = append(msgList, string(msg.Payload()))
+       }
+
+       assert.Equal(t, 10, len(msgList))
+       res := util.RemoveDuplicateElement(msgList)
+       assert.Equal(t, 10, len(res))
+}
diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
index 87cf68b..3a729cd 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -318,6 +318,7 @@ func (pc *partitionConsumer) ReceiveAsync(ctx 
context.Context, msgs chan<- Consu
                                if err != nil {
                                        return err
                                }
+                               receivedSinceFlow = 0
                                continue
                        }
                        break
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2c707ea..0d7d3d8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -382,13 +382,13 @@ func (c *connection) internalSendRequest(req *request) {
 func (c *connection) handleResponse(requestID uint64, response 
*pb.BaseCommand) {
        c.mapMutex.RLock()
        request, ok := c.pendingReqs[requestID]
-       c.mapMutex.RUnlock()
        if !ok {
                c.log.Warnf("Received unexpected response for request %d of 
type %s", requestID, response.Type)
                return
        }
 
        delete(c.pendingReqs, requestID)
+       c.mapMutex.RUnlock()
        request.callback(response)
 }
 

Reply via email to