This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new a615761 Check `golint`, `gofmt` and `go import` for project (#55) a615761 is described below commit a61576168966d030e71d37a4a1dcd4cda023941b Author: 冉小龙 <ranxiaolong...@gmail.com> AuthorDate: Thu Aug 15 18:01:10 2019 +0800 Check `golint`, `gofmt` and `go import` for project (#55) * code format Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com> * fix seek logic Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com> --- examples/consumer/consumer.go | 77 +++++++++++++++++----------------- examples/producer/producer.go | 65 ++++++++++++++-------------- pkg/compression/compression_test.go | 9 ++-- pkg/compression/lz4.go | 2 +- pulsar/consumer_test.go | 15 +++++-- pulsar/impl_client_test.go | 2 +- pulsar/impl_consumer.go | 11 ++--- pulsar/impl_producer.go | 4 +- pulsar/internal/checksum.go | 22 +++++----- pulsar/internal/checksum_test.go | 51 +++++++++++----------- pulsar/internal/commands.go | 12 +++--- pulsar/internal/connection.go | 8 ++-- pulsar/internal/hash_test.go | 6 ++- pulsar/internal/lookup_service_test.go | 4 +- pulsar/internal/rpc_client.go | 8 ++-- pulsar/producer_test.go | 6 ++- pulsar/test_helper.go | 3 +- pulsar/unackMsgTracker_test.go | 63 ++++++++++++++-------------- util/error.go | 29 +++++++------ util/util_test.go | 7 ++-- 20 files changed, 210 insertions(+), 194 deletions(-) diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go index 3010853..c408386 100644 --- a/examples/consumer/consumer.go +++ b/examples/consumer/consumer.go @@ -18,45 +18,46 @@ package main import ( - "context" - "fmt" - `github.com/apache/pulsar-client-go/pulsar` - `log` + "context" + "fmt" + "log" + + "github.com/apache/pulsar-client-go/pulsar" ) func main() { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) - if err != nil { - log.Fatal(err) - } - - defer client.Close() - - consumer, err := client.Subscribe(pulsar.ConsumerOptions{ - Topic: "topic-1", - SubscriptionName: "my-sub", - Type: pulsar.Shared, - }) - if err != nil { - log.Fatal(err) - } - defer consumer.Close() - - for i := 0; i < 10; i++ { - msg, err := consumer.Receive(context.Background()) - if err != nil { - log.Fatal(err) - } - - fmt.Printf("Received message msgId: %#v -- content: '%s'\n", - msg.ID(), string(msg.Payload())) - - if err := consumer.Ack(msg); err != nil { - log.Fatal(err) - } - } - - if err := consumer.Unsubscribe(); err != nil { - log.Fatal(err) - } + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + if err != nil { + log.Fatal(err) + } + + defer client.Close() + + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: "topic-1", + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + log.Fatal(err) + } + defer consumer.Close() + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), string(msg.Payload())) + + if err := consumer.Ack(msg); err != nil { + log.Fatal(err) + } + } + + if err := consumer.Unsubscribe(); err != nil { + log.Fatal(err) + } } diff --git a/examples/producer/producer.go b/examples/producer/producer.go index 56b87b5..24ca58d 100644 --- a/examples/producer/producer.go +++ b/examples/producer/producer.go @@ -18,39 +18,40 @@ package main import ( - `context` - `fmt` - `github.com/apache/pulsar-client-go/pulsar` - `log` + "context" + "fmt" + "log" + + "github.com/apache/pulsar-client-go/pulsar" ) func main() { - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: "pulsar://localhost:6650", - }) - - if err != nil { - log.Fatal(err) - } - - defer client.Close() - - producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: "topic-1", - }) - if err != nil { - log.Fatal(err) - } - - defer producer.Close() - - ctx := context.Background() - - for i := 0; i < 10; i++ { - if err := producer.Send(ctx, &pulsar.ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", i)), - }); err != nil { - log.Fatal(err) - } - } + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + if err != nil { + log.Fatal(err) + } + + defer client.Close() + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: "topic-1", + }) + if err != nil { + log.Fatal(err) + } + + defer producer.Close() + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + log.Fatal(err) + } + } } diff --git a/pkg/compression/compression_test.go b/pkg/compression/compression_test.go index 98d4509..09fe359 100644 --- a/pkg/compression/compression_test.go +++ b/pkg/compression/compression_test.go @@ -38,7 +38,8 @@ var providers = []testProvider{ } func TestCompression(t *testing.T) { - for _, p := range providers { + for _, provider := range providers { + p := provider t.Run(p.name, func(t *testing.T) { if !p.provider.CanCompress() { return @@ -54,7 +55,8 @@ func TestCompression(t *testing.T) { } func TestJavaCompatibility(t *testing.T) { - for _, p := range providers { + for _, provider := range providers { + p := provider t.Run(p.name, func(t *testing.T) { hello := []byte("hello") uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello)) @@ -65,7 +67,8 @@ func TestJavaCompatibility(t *testing.T) { } func TestDecompressionError(t *testing.T) { - for _, p := range providers { + for _, provider := range providers { + p := provider t.Run(p.name, func(t *testing.T) { _, err := p.provider.Decompress([]byte{0x05}, 10) assert.NotNil(t, err) diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go index b391336..d252057 100644 --- a/pkg/compression/lz4.go +++ b/pkg/compression/lz4.go @@ -21,7 +21,7 @@ import ( "github.com/pierrec/lz4" ) -type lz4Provider struct {} +type lz4Provider struct{} // NewLz4Provider return a interface of Provider. func NewLz4Provider() Provider { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 7cc32ed..97b4264 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -20,13 +20,14 @@ package pulsar import ( "context" "fmt" - "github.com/apache/pulsar-client-go/util" - "github.com/stretchr/testify/assert" "log" "net/http" "strings" "testing" "time" + + "github.com/apache/pulsar-client-go/util" + "github.com/stretchr/testify/assert" ) var ( @@ -154,6 +155,7 @@ func TestBatchMessageReceive(t *testing.T) { Topic: topicName, SubscriptionName: subName, }) + assert.Nil(t, err) assert.Equal(t, topicName, consumer.Topic()) count := 0 @@ -429,17 +431,19 @@ func TestConsumer_ReceiveAsync(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) + assert.Nil(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: subName, }) + assert.Nil(t, err) defer consumer.Close() //send 10 messages for i := 0; i < 10; i++ { - err := producer.Send(ctx, &ProducerMessage{ + err = producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) assert.Nil(t, err) @@ -610,12 +614,14 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) + assert.Nil(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: subName, }) + assert.Nil(t, err) defer consumer.Close() //send 10 messages @@ -627,12 +633,13 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) { } for i := 0; i < 10; i++ { + tmpMsg := fmt.Sprintf("hello-%d", i) consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) { if err != nil { log.Fatal(err) } fmt.Printf("receive message payload is:%s\n", string(msg.Payload())) - assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) + assert.Equal(t, tmpMsg, string(msg.Payload())) }) } } diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go index 7f6c4fc..55e227f 100644 --- a/pulsar/impl_client_test.go +++ b/pulsar/impl_client_test.go @@ -29,7 +29,7 @@ func TestClient(t *testing.T) { client, err := NewClient(ClientOptions{}) assert.Nil(t, client) assert.NotNil(t, err) - assert.Equal(t, Result(ResultInvalidConfiguration), err.(*Error).Result()) + assert.Equal(t, ResultInvalidConfiguration, err.(*Error).Result()) } func TestTLSConnectionCAError(t *testing.T) { diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go index 13e72ae..1f800b5 100644 --- a/pulsar/impl_consumer.go +++ b/pulsar/impl_consumer.go @@ -101,9 +101,9 @@ func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string for partitionIdx, partitionTopic := range partitions { go func(partitionIdx int, partitionTopic string) { - cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue) + cons, e := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue) ch <- ConsumerError{ - err: err, + err: e, partition: partitionIdx, cons: cons, } @@ -268,9 +268,10 @@ func (c *consumer) Seek(msgID MessageID) error { partition := id.GetPartition() - if partition < 0 { - return errors.New("invalid partition index") - } + // current topic is non-partition topic, we only need to get the first value in the consumers. + if partition < 0 { + partition = 0 + } return c.consumers[partition].Seek(msgID) } diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go index 169d844..5606f86 100644 --- a/pulsar/impl_producer.go +++ b/pulsar/impl_producer.go @@ -79,11 +79,11 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { for partitionIdx, partition := range partitions { go func(partitionIdx int, partition string) { - prod, err := newPartitionProducer(client, partition, options, partitionIdx) + prod, e := newPartitionProducer(client, partition, options, partitionIdx) c <- ProducerError{ partition: partitionIdx, prod: prod, - err: err, + err: e, } }(partitionIdx, partition) } diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go index 3fc37ae..e7cf787 100644 --- a/pulsar/internal/checksum.go +++ b/pulsar/internal/checksum.go @@ -18,8 +18,8 @@ package internal import ( - `hash` - `hash/crc32` + "hash" + "hash/crc32" ) // crc32cTable holds the precomputed crc32 hash table @@ -27,7 +27,7 @@ import ( var crc32cTable = crc32.MakeTable(crc32.Castagnoli) type CheckSum struct { - hash hash.Hash + hash hash.Hash } // Crc32cCheckSum handles computing the checksum. @@ -36,15 +36,15 @@ func Crc32cCheckSum(data []byte) uint32 { } func (cs *CheckSum) Write(p []byte) (int, error) { - if cs.hash == nil { - cs.hash = crc32.New(crc32cTable) - } - return cs.hash.Write(p) + if cs.hash == nil { + cs.hash = crc32.New(crc32cTable) + } + return cs.hash.Write(p) } func (cs *CheckSum) compute() []byte { - if cs.hash == nil { - return nil - } - return cs.hash.Sum(nil) + if cs.hash == nil { + return nil + } + return cs.hash.Sum(nil) } diff --git a/pulsar/internal/checksum_test.go b/pulsar/internal/checksum_test.go index 23dc621..c02edcf 100644 --- a/pulsar/internal/checksum_test.go +++ b/pulsar/internal/checksum_test.go @@ -1,4 +1,3 @@ -// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,37 +14,35 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// package internal import ( - "bytes" - "hash/crc32" - "testing" + "bytes" + "hash/crc32" + "testing" ) func TestFrameChecksum(t *testing.T) { - input := []byte{1, 2, 3, 4, 5} - var f CheckSum - - if got := f.compute(); got != nil { - t.Fatalf("compute() = %v; expected nil", got) - } - - if _, err := f.Write(input); err != nil { - t.Fatalf("Write() err = %v; expected nil", err) - } - - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - if _, err := h.Write(input); err != nil { - t.Fatal(err) - } - - if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, expected) { - t.Fatalf("compute() = %x; expected %x", got, expected) - } else { - t.Logf("compute() = 0x%x", got) - } + input := []byte{1, 2, 3, 4, 5} + var f CheckSum + + if got := f.compute(); got != nil { + t.Fatalf("compute() = %v; expected nil", got) + } + + if _, err := f.Write(input); err != nil { + t.Fatalf("Write() err = %v; expected nil", err) + } + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + if _, err := h.Write(input); err != nil { + t.Fatal(err) + } + + if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, expected) { + t.Fatalf("compute() = %x; expected %x", got, expected) + } else { + t.Logf("compute() = 0x%x", got) + } } - diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 1f29f7f..2800b92 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -21,10 +21,10 @@ import ( "bytes" "encoding/binary" "fmt" - "github.com/golang/protobuf/proto" "io" "github.com/apache/pulsar-client-go/pkg/pb" + "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -172,9 +172,9 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa return nil, nil, err } - singleMessages, err := decodeBatchPayload(payloads, numMsg) - if err != nil { - return nil, nil, err + singleMessages, e := decodeBatchPayload(payloads, numMsg) + if e != nil { + return nil, nil, e } payloadList = make([][]byte, 0, numMsg) @@ -185,7 +185,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa payloadList = append(payloadList, singleMsg.SinglePayload) } - if err := computeChecksum(chksum, expectedChksum); err != nil { + if err = computeChecksum(chksum, expectedChksum); err != nil { return nil, nil, err } return msgMeta, payloadList, nil @@ -207,7 +207,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa payloadList = append(payloadList, payload) } - if err := computeChecksum(chksum, expectedChksum); err != nil { + if err = computeChecksum(chksum, expectedChksum); err != nil { return nil, nil, err } diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 0d7d3d8..2886a7c 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -404,15 +404,15 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error { c.log.Debug("Got Message: ", response) - consumerId := response.GetConsumerId() - if consumer, ok := c.connWrapper.Consumers[consumerId]; ok { + consumerID := response.GetConsumerId() + if consumer, ok := c.connWrapper.Consumers[consumerID]; ok { err := consumer.MessageReceived(response, payload) if err != nil { - c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId) + c.log.WithField("consumerID", consumerID).Error("handle message err: ", response.MessageId) return errors.New("handler not found") } } else { - c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId) + c.log.WithField("consumerID", consumerID).Warn("Got unexpected message: ", response.MessageId) } return nil } diff --git a/pulsar/internal/hash_test.go b/pulsar/internal/hash_test.go index 2f9bc19..5e103cb 100644 --- a/pulsar/internal/hash_test.go +++ b/pulsar/internal/hash_test.go @@ -42,7 +42,8 @@ var murmurHashValues = []testProvider{ } func TestJavaHash(t *testing.T) { - for _, p := range javaHashValues { + for _, javaHashValue := range javaHashValues { + p := javaHashValue t.Run(p.str, func(t *testing.T) { assert.Equal(t, p.hash, JavaStringHash(p.str)) }) @@ -50,7 +51,8 @@ func TestJavaHash(t *testing.T) { } func TestMurmurHash(t *testing.T) { - for _, p := range murmurHashValues { + for _, murmurHashValue := range murmurHashValues { + p := murmurHashValue t.Run(p.str, func(t *testing.T) { assert.Equal(t, p.hash, Murmur3_32Hash(p.str)) }) diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index de7f058..0c548bc 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -95,8 +95,8 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType return nil, nil } -func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { +func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, + message proto.Message) (*RPCResult, error) { assert.Fail(c.t, "Shouldn't be called") return nil, nil } diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 2b68a71..66e8a79 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -45,7 +45,7 @@ type RPCClient interface { Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) - RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) + RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) } @@ -112,13 +112,13 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba return rpcResult, nil } -func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { +func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, + message proto.Message) (*RPCResult, error) { rpcResult := &RPCResult{ Cnx: cnx, } - cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response *pb.BaseCommand) { + cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) { rpcResult.Response = response }) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index e9327d0..391c38f 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -169,7 +169,8 @@ func TestProducerCompression(t *testing.T) { {"zstd", ZSTD}, } - for _, p := range providers { + for _, provider := range providers { + p := provider t.Run(p.name, func(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, @@ -290,6 +291,7 @@ func TestFlushInProducer(t *testing.T) { "producer-id": "test-producer-id", }, }) + assert.Nil(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -325,7 +327,7 @@ func TestFlushInProducer(t *testing.T) { wg.Wait() for i := 0; i < numOfMessages/2; i++ { - _, err := consumer.Receive(ctx) + _, err = consumer.Receive(ctx) assert.Nil(t, err) msgCount++ } diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go index 1329be1..9930a99 100644 --- a/pulsar/test_helper.go +++ b/pulsar/test_helper.go @@ -57,8 +57,9 @@ func httpPut(url string, body interface{}) { "Content-Type": {"application/json"}, } - _, err = client.Do(req) + resp, err := client.Do(req) if err != nil { log.Fatal(err) } + resp.Body.Close() } diff --git a/pulsar/unackMsgTracker_test.go b/pulsar/unackMsgTracker_test.go index d292fc8..edf7ddc 100644 --- a/pulsar/unackMsgTracker_test.go +++ b/pulsar/unackMsgTracker_test.go @@ -18,47 +18,48 @@ package pulsar import ( - `github.com/apache/pulsar-client-go/pkg/pb` - `github.com/golang/protobuf/proto` - `github.com/stretchr/testify/assert` - `testing` + "testing" + + "github.com/apache/pulsar-client-go/pkg/pb" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" ) func TestUnackedMessageTracker(t *testing.T) { - unAckTracker := NewUnackedMessageTracker() + unAckTracker := NewUnackedMessageTracker() - var msgIDs []*pb.MessageIdData + var msgIDs []*pb.MessageIdData - for i := 0; i < 5; i++ { - msgID := &pb.MessageIdData{ - LedgerId: proto.Uint64(1), - EntryId: proto.Uint64(uint64(i)), - Partition: proto.Int32(-1), - BatchIndex: proto.Int32(-1), - } + for i := 0; i < 5; i++ { + msgID := &pb.MessageIdData{ + LedgerId: proto.Uint64(1), + EntryId: proto.Uint64(uint64(i)), + Partition: proto.Int32(-1), + BatchIndex: proto.Int32(-1), + } - msgIDs = append(msgIDs, msgID) - } + msgIDs = append(msgIDs, msgID) + } - for _, msgID := range msgIDs { - ok := unAckTracker.Add(msgID) - assert.True(t, ok) - } + for _, msgID := range msgIDs { + ok := unAckTracker.Add(msgID) + assert.True(t, ok) + } - flag := unAckTracker.IsEmpty() - assert.False(t, flag) + flag := unAckTracker.IsEmpty() + assert.False(t, flag) - num := unAckTracker.Size() - assert.Equal(t, num, 5) + num := unAckTracker.Size() + assert.Equal(t, num, 5) - for index, msgID := range msgIDs { - unAckTracker.Remove(msgID) - assert.Equal(t, 4-index, unAckTracker.Size()) - } + for index, msgID := range msgIDs { + unAckTracker.Remove(msgID) + assert.Equal(t, 4-index, unAckTracker.Size()) + } - num = unAckTracker.Size() - assert.Equal(t, num, 0) + num = unAckTracker.Size() + assert.Equal(t, num, 0) - flag = unAckTracker.IsEmpty() - assert.True(t, flag) + flag = unAckTracker.IsEmpty() + assert.True(t, flag) } diff --git a/util/error.go b/util/error.go index 6a051f5..755b7c0 100644 --- a/util/error.go +++ b/util/error.go @@ -1,4 +1,3 @@ -// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,36 +14,36 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// package util import ( - `fmt` - `github.com/apache/pulsar-client-go/pkg/pb` + "fmt" + + "github.com/apache/pulsar-client-go/pkg/pb" ) // NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error. // Optionally provide a list of IDs associated with the message // for additional context in the error message. func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *UnexpectedErrMsg { - return &UnexpectedErrMsg{ - msgType: msgType, - ids: ids, - } + return &UnexpectedErrMsg{ + msgType: msgType, + ids: ids, + } } // UnexpectedErrMsg is returned when an unexpected message is received. type UnexpectedErrMsg struct { - msgType pb.BaseCommand_Type - ids []interface{} + msgType pb.BaseCommand_Type + ids []interface{} } // Error satisfies the error interface. func (e *UnexpectedErrMsg) Error() string { - msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String()) - for _, id := range e.ids { - msg += fmt.Sprintf(" id=%v", id) - } - return msg + msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String()) + for _, id := range e.ids { + msg += fmt.Sprintf(" id=%v", id) + } + return msg } diff --git a/util/util_test.go b/util/util_test.go index 284dd0c..3b0a9f9 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -19,13 +19,14 @@ package util import ( "fmt" - "github.com/stretchr/testify/assert" "strings" "testing" + + "github.com/stretchr/testify/assert" ) func TestIsNil(t *testing.T) { - var a interface{} = nil + var a interface{} var b interface{} = (*int)(nil) assert.True(t, a == nil) @@ -35,6 +36,6 @@ func TestIsNil(t *testing.T) { func TestRemoveDuplicateElement(t *testing.T) { s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"} resList := RemoveDuplicateElement(s) - res := fmt.Sprintf("%s", resList[:]) + res := fmt.Sprintf("%s", resList) assert.Equal(t, 1, strings.Count(res, "hello")) }