This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 8774688 [Issue:53] Fix concurrent map write (#54) 8774688 is described below commit 87746885974efe881cedfef1db484f9f659d86b6 Author: 冉小龙 <ranxiaolong...@gmail.com> AuthorDate: Wed Aug 14 19:11:07 2019 +0800 [Issue:53] Fix concurrent map write (#54) * [Issue:53]Fix concurrent map write Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com> --- CONTRIBUTING.md | 47 +++++++++------------- pulsar/consumer_test.go | 82 ++++++++++++++++++++++++++++++++++++++- pulsar/impl_partition_consumer.go | 1 + pulsar/internal/connection.go | 2 +- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7a6e414..d05c687 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,23 +21,15 @@ # How to contribute -If you would like to contribute code to this project you can do so through GitHub by forking the repository and sending a pull request. +If you would like to contribute code to this project, fork the repository and send a pull request. -This document outlines some of the conventions on development workflow, commit message formatting, contact points and other resources to make it easier to get your contribution accepted. +## Prerequisite -## Steps to Contribute +If you have not installed Go, install it according to the [installation instruction](http://golang.org/doc/install). -Since the `go mod` package management tool is used in this project, your go version is required at **Go1.11+**. +Since the `go mod` package management tool is used in this project, **Go 1.11 or higher** version is required. -### Fork - -Before you start contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository. - -### Installation - -If you don't currently have a go environment installed,install Go according to the installation instructions here: http://golang.org/doc/install - -##### mac os && linux +### Install Go on Mac OS and Linux ```bash $ mkdir -p $HOME/github.com/apache/ @@ -47,19 +39,21 @@ $ cd pulsar-client-go $ go mod download ``` -When you execute `go mod download`, there may be some libs that cannot be downloaded. You can download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/). +If some libs cannot be downloaded when you enter the `go mod download` command, download them by referring to the proxy provided by [GOPROXY.io](https://goproxy.io/). + +## Fork -### Contribution flow +Before contributing, you need to fork [pulsar-client-go](https://github.com/apache/pulsar) to your github repository. + +## Contribution flow ```bash $ git remote add apache g...@github.com:apache/pulsar-client-go.git - // sync with remote master $ git checkout master $ git fetch apache $ git rebase apache/master $ git push origin master - // create PR branch $ git checkout -b your_branch # do your work, and then @@ -68,19 +62,16 @@ $ git commit -sm "xxx" $ git push origin your_branch ``` -Thanks for your contributions! - -#### Code style - -The coding style suggested by the Golang community is used in Apache pulsar-client-go. See the [style doc](https://github.com/golang/go/wiki/CodeReviewComments) for details. +## Code style -Please follow this style to make your pull request easy to review, maintain and develop. +The coding style suggested by the Golang community is used in Apache pulsar-client-go. For details, refer to [style doc](https://github.com/golang/go/wiki/CodeReviewComments). +Follow the style, make your pull request easy to review, maintain and develop. -#### Create new file +## Create new files -The project uses the open source protocol of Apache License 2.0. When you need to create a new file when developing new features, -please add it at the beginning of the file. The location of the header file: [header file](.header). +The project uses the open source protocol of Apache License 2.0. If you need to create a new file when developing new features, +add the license at the beginning of each file. The location of the header file: [header file](.header). -#### Updating dependencies +## Update dependencies -Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency: use the `go mod edit` command to change the dependency. +Apache `pulsar-client-go` uses [Go 1.11 module](https://github.com/golang/go/wiki/Modules) to manage dependencies. To add or update a dependency, use the `go mod edit` command to change the dependency. \ No newline at end of file diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 6fe86cd..7cc32ed 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "github.com/apache/pulsar-client-go/util" "github.com/stretchr/testify/assert" "log" "net/http" @@ -361,7 +362,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { topic := "persistent://public/default/testGetPartitions" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions" - makeHTTPCall(t, http.MethodPut, testURL, "5") + makeHTTPCall(t, http.MethodPut, testURL, "64") // create producer producer, err := client.CreateProducer(ProducerOptions{ @@ -635,3 +636,82 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) { }) } } + +func TestConsumer_Shared(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/testMultiPartitionConsumerShared" + testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions" + + makeHTTPCall(t, http.MethodPut, testURL, "3") + + sub := "sub-shared-1" + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sub, + Type: Shared, + }) + assert.Nil(t, err) + defer consumer1.Close() + + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sub, + Type: Shared, + }) + assert.Nil(t, err) + defer consumer2.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + log.Fatal(err) + } + } + + msgList := make([]string, 0, 5) + for i := 0; i < 5; i++ { + msg, err := consumer1.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload())) + msgList = append(msgList, string(msg.Payload())) + if err := consumer1.Ack(msg); err != nil { + log.Fatal(err) + } + } + + assert.Equal(t, 5, len(msgList)) + + for i := 0; i < 5; i++ { + msg, err := consumer2.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + if err := consumer2.Ack(msg); err != nil { + log.Fatal(err) + } + fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload())) + msgList = append(msgList, string(msg.Payload())) + } + + assert.Equal(t, 10, len(msgList)) + res := util.RemoveDuplicateElement(msgList) + assert.Equal(t, 10, len(res)) +} diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go index 87cf68b..3a729cd 100644 --- a/pulsar/impl_partition_consumer.go +++ b/pulsar/impl_partition_consumer.go @@ -318,6 +318,7 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu if err != nil { return err } + receivedSinceFlow = 0 continue } break diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2c707ea..0d7d3d8 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -382,13 +382,13 @@ func (c *connection) internalSendRequest(req *request) { func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) { c.mapMutex.RLock() request, ok := c.pendingReqs[requestID] - c.mapMutex.RUnlock() if !ok { c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type) return } delete(c.pendingReqs, requestID) + c.mapMutex.RUnlock() request.callback(response) }