This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a615761  Check `golint`, `gofmt` and `go import` for project (#55)
a615761 is described below

commit a61576168966d030e71d37a4a1dcd4cda023941b
Author: 冉小龙 <ranxiaolong...@gmail.com>
AuthorDate: Thu Aug 15 18:01:10 2019 +0800

    Check `golint`, `gofmt` and `go import` for project (#55)
    
    * code format
    
    Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com>
    
    * fix seek logic
    
    Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com>
---
 examples/consumer/consumer.go          | 77 +++++++++++++++++-----------------
 examples/producer/producer.go          | 65 ++++++++++++++--------------
 pkg/compression/compression_test.go    |  9 ++--
 pkg/compression/lz4.go                 |  2 +-
 pulsar/consumer_test.go                | 15 +++++--
 pulsar/impl_client_test.go             |  2 +-
 pulsar/impl_consumer.go                | 11 ++---
 pulsar/impl_producer.go                |  4 +-
 pulsar/internal/checksum.go            | 22 +++++-----
 pulsar/internal/checksum_test.go       | 51 +++++++++++-----------
 pulsar/internal/commands.go            | 12 +++---
 pulsar/internal/connection.go          |  8 ++--
 pulsar/internal/hash_test.go           |  6 ++-
 pulsar/internal/lookup_service_test.go |  4 +-
 pulsar/internal/rpc_client.go          |  8 ++--
 pulsar/producer_test.go                |  6 ++-
 pulsar/test_helper.go                  |  3 +-
 pulsar/unackMsgTracker_test.go         | 63 ++++++++++++++--------------
 util/error.go                          | 29 +++++++------
 util/util_test.go                      |  7 ++--
 20 files changed, 210 insertions(+), 194 deletions(-)

diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 3010853..c408386 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -18,45 +18,46 @@
 package main
 
 import (
-    "context"
-    "fmt"
-    `github.com/apache/pulsar-client-go/pulsar`
-    `log`
+       "context"
+       "fmt"
+       "log"
+
+    "github.com/apache/pulsar-client-go/pulsar"
 )
 
 func main() {
-    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
-    if err != nil {
-        log.Fatal(err)
-    }
-
-    defer client.Close()
-
-    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
-        Topic:            "topic-1",
-        SubscriptionName: "my-sub",
-        Type:             pulsar.Shared,
-    })
-    if err != nil {
-        log.Fatal(err)
-    }
-    defer consumer.Close()
-
-    for i := 0; i < 10; i++ {
-        msg, err := consumer.Receive(context.Background())
-        if err != nil {
-            log.Fatal(err)
-        }
-
-        fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
-            msg.ID(), string(msg.Payload()))
-
-        if err := consumer.Ack(msg); err != nil {
-            log.Fatal(err)
-        }
-    }
-
-    if err := consumer.Unsubscribe(); err != nil {
-        log.Fatal(err)
-    }
+       client, err := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer client.Close()
+
+       consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+               Topic:            "topic-1",
+               SubscriptionName: "my-sub",
+               Type:             pulsar.Shared,
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       defer consumer.Close()
+
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+                       msg.ID(), string(msg.Payload()))
+
+               if err := consumer.Ack(msg); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       if err := consumer.Unsubscribe(); err != nil {
+               log.Fatal(err)
+       }
 }
diff --git a/examples/producer/producer.go b/examples/producer/producer.go
index 56b87b5..24ca58d 100644
--- a/examples/producer/producer.go
+++ b/examples/producer/producer.go
@@ -18,39 +18,40 @@
 package main
 
 import (
-    `context`
-    `fmt`
-    `github.com/apache/pulsar-client-go/pulsar`
-    `log`
+       "context"
+       "fmt"
+       "log"
+
+       "github.com/apache/pulsar-client-go/pulsar"
 )
 
 func main() {
-    client, err := pulsar.NewClient(pulsar.ClientOptions{
-        URL: "pulsar://localhost:6650",
-    })
-
-    if err != nil {
-        log.Fatal(err)
-    }
-
-    defer client.Close()
-
-    producer, err := client.CreateProducer(pulsar.ProducerOptions{
-        Topic: "topic-1",
-    })
-    if err != nil {
-        log.Fatal(err)
-    }
-
-    defer producer.Close()
-
-    ctx := context.Background()
-
-    for i := 0; i < 10; i++ {
-        if err := producer.Send(ctx, &pulsar.ProducerMessage{
-            Payload: []byte(fmt.Sprintf("hello-%d", i)),
-        }); err != nil {
-            log.Fatal(err)
-        }
-    }
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer client.Close()
+
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: "topic-1",
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, &pulsar.ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
 }
diff --git a/pkg/compression/compression_test.go 
b/pkg/compression/compression_test.go
index 98d4509..09fe359 100644
--- a/pkg/compression/compression_test.go
+++ b/pkg/compression/compression_test.go
@@ -38,7 +38,8 @@ var providers = []testProvider{
 }
 
 func TestCompression(t *testing.T) {
-       for _, p := range providers {
+       for _, provider := range providers {
+               p := provider
                t.Run(p.name, func(t *testing.T) {
                        if !p.provider.CanCompress() {
                                return
@@ -54,7 +55,8 @@ func TestCompression(t *testing.T) {
 }
 
 func TestJavaCompatibility(t *testing.T) {
-       for _, p := range providers {
+       for _, provider := range providers {
+               p := provider
                t.Run(p.name, func(t *testing.T) {
                        hello := []byte("hello")
                        uncompressed, err := 
p.provider.Decompress(p.compressedHello, len(hello))
@@ -65,7 +67,8 @@ func TestJavaCompatibility(t *testing.T) {
 }
 
 func TestDecompressionError(t *testing.T) {
-       for _, p := range providers {
+       for _, provider := range providers {
+               p := provider
                t.Run(p.name, func(t *testing.T) {
                        _, err := p.provider.Decompress([]byte{0x05}, 10)
                        assert.NotNil(t, err)
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index b391336..d252057 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -21,7 +21,7 @@ import (
        "github.com/pierrec/lz4"
 )
 
-type lz4Provider struct {}
+type lz4Provider struct{}
 
 // NewLz4Provider return a interface of Provider.
 func NewLz4Provider() Provider {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 7cc32ed..97b4264 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,13 +20,14 @@ package pulsar
 import (
        "context"
        "fmt"
-       "github.com/apache/pulsar-client-go/util"
-       "github.com/stretchr/testify/assert"
        "log"
        "net/http"
        "strings"
        "testing"
        "time"
+
+       "github.com/apache/pulsar-client-go/util"
+       "github.com/stretchr/testify/assert"
 )
 
 var (
@@ -154,6 +155,7 @@ func TestBatchMessageReceive(t *testing.T) {
                Topic:            topicName,
                SubscriptionName: subName,
        })
+       assert.Nil(t, err)
        assert.Equal(t, topicName, consumer.Topic())
        count := 0
 
@@ -429,17 +431,19 @@ func TestConsumer_ReceiveAsync(t *testing.T) {
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topicName,
        })
+       assert.Nil(t, err)
        defer producer.Close()
 
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topicName,
                SubscriptionName: subName,
        })
+       assert.Nil(t, err)
        defer consumer.Close()
 
        //send 10 messages
        for i := 0; i < 10; i++ {
-               err := producer.Send(ctx, &ProducerMessage{
+               err = producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                assert.Nil(t, err)
@@ -610,12 +614,14 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topicName,
        })
+       assert.Nil(t, err)
        defer producer.Close()
 
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topicName,
                SubscriptionName: subName,
        })
+       assert.Nil(t, err)
        defer consumer.Close()
 
        //send 10 messages
@@ -627,12 +633,13 @@ func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
        }
 
        for i := 0; i < 10; i++ {
+               tmpMsg := fmt.Sprintf("hello-%d", i)
                consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err 
error) {
                        if err != nil {
                                log.Fatal(err)
                        }
                        fmt.Printf("receive message payload is:%s\n", 
string(msg.Payload()))
-                       assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+                       assert.Equal(t, tmpMsg, string(msg.Payload()))
                })
        }
 }
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index 7f6c4fc..55e227f 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -29,7 +29,7 @@ func TestClient(t *testing.T) {
        client, err := NewClient(ClientOptions{})
        assert.Nil(t, client)
        assert.NotNil(t, err)
-       assert.Equal(t, Result(ResultInvalidConfiguration), 
err.(*Error).Result())
+       assert.Equal(t, ResultInvalidConfiguration, err.(*Error).Result())
 }
 
 func TestTLSConnectionCAError(t *testing.T) {
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 13e72ae..1f800b5 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -101,9 +101,9 @@ func singleTopicSubscribe(client *client, options 
*ConsumerOptions, topic string
 
        for partitionIdx, partitionTopic := range partitions {
                go func(partitionIdx int, partitionTopic string) {
-                       cons, err := newPartitionConsumer(client, 
partitionTopic, options, partitionIdx, numPartitions, c.queue)
+                       cons, e := newPartitionConsumer(client, partitionTopic, 
options, partitionIdx, numPartitions, c.queue)
                        ch <- ConsumerError{
-                               err:       err,
+                               err:       e,
                                partition: partitionIdx,
                                cons:      cons,
                        }
@@ -268,9 +268,10 @@ func (c *consumer) Seek(msgID MessageID) error {
 
        partition := id.GetPartition()
 
-       if partition < 0 {
-               return errors.New("invalid partition index")
-       }
+    // current topic is non-partition topic, we only need to get the first 
value in the consumers.
+    if partition < 0 {
+        partition = 0
+    }
        return c.consumers[partition].Seek(msgID)
 }
 
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 169d844..5606f86 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -79,11 +79,11 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
 
        for partitionIdx, partition := range partitions {
                go func(partitionIdx int, partition string) {
-                       prod, err := newPartitionProducer(client, partition, 
options, partitionIdx)
+                       prod, e := newPartitionProducer(client, partition, 
options, partitionIdx)
                        c <- ProducerError{
                                partition: partitionIdx,
                                prod:      prod,
-                               err:       err,
+                               err:       e,
                        }
                }(partitionIdx, partition)
        }
diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go
index 3fc37ae..e7cf787 100644
--- a/pulsar/internal/checksum.go
+++ b/pulsar/internal/checksum.go
@@ -18,8 +18,8 @@
 package internal
 
 import (
-    `hash`
-    `hash/crc32`
+       "hash"
+       "hash/crc32"
 )
 
 // crc32cTable holds the precomputed crc32 hash table
@@ -27,7 +27,7 @@ import (
 var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
 
 type CheckSum struct {
-    hash hash.Hash
+       hash hash.Hash
 }
 
 // Crc32cCheckSum handles computing the checksum.
@@ -36,15 +36,15 @@ func Crc32cCheckSum(data []byte) uint32 {
 }
 
 func (cs *CheckSum) Write(p []byte) (int, error) {
-    if cs.hash == nil {
-        cs.hash = crc32.New(crc32cTable)
-    }
-    return cs.hash.Write(p)
+       if cs.hash == nil {
+               cs.hash = crc32.New(crc32cTable)
+       }
+       return cs.hash.Write(p)
 }
 
 func (cs *CheckSum) compute() []byte {
-    if cs.hash == nil {
-        return nil
-    }
-    return cs.hash.Sum(nil)
+       if cs.hash == nil {
+               return nil
+       }
+       return cs.hash.Sum(nil)
 }
diff --git a/pulsar/internal/checksum_test.go b/pulsar/internal/checksum_test.go
index 23dc621..c02edcf 100644
--- a/pulsar/internal/checksum_test.go
+++ b/pulsar/internal/checksum_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,37 +14,35 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
 import (
-    "bytes"
-    "hash/crc32"
-    "testing"
+       "bytes"
+       "hash/crc32"
+       "testing"
 )
 
 func TestFrameChecksum(t *testing.T) {
-    input := []byte{1, 2, 3, 4, 5}
-    var f CheckSum
-
-    if got := f.compute(); got != nil {
-        t.Fatalf("compute() = %v; expected nil", got)
-    }
-
-    if _, err := f.Write(input); err != nil {
-        t.Fatalf("Write() err = %v; expected nil", err)
-    }
-
-    h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
-    if _, err := h.Write(input); err != nil {
-        t.Fatal(err)
-    }
-
-    if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, expected) {
-        t.Fatalf("compute() = %x; expected %x", got, expected)
-    } else {
-        t.Logf("compute() = 0x%x", got)
-    }
+       input := []byte{1, 2, 3, 4, 5}
+       var f CheckSum
+
+       if got := f.compute(); got != nil {
+               t.Fatalf("compute() = %v; expected nil", got)
+       }
+
+       if _, err := f.Write(input); err != nil {
+               t.Fatalf("Write() err = %v; expected nil", err)
+       }
+
+       h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+       if _, err := h.Write(input); err != nil {
+               t.Fatal(err)
+       }
+
+       if got, expected := f.compute(), h.Sum(nil); !bytes.Equal(got, 
expected) {
+               t.Fatalf("compute() = %x; expected %x", got, expected)
+       } else {
+               t.Logf("compute() = 0x%x", got)
+       }
 }
-
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 1f29f7f..2800b92 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -21,10 +21,10 @@ import (
        "bytes"
        "encoding/binary"
        "fmt"
-       "github.com/golang/protobuf/proto"
        "io"
 
        "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/golang/protobuf/proto"
        log "github.com/sirupsen/logrus"
 )
 
@@ -172,9 +172,9 @@ func ParseMessage(headersAndPayload []byte) (msgMeta 
*pb.MessageMetadata, payloa
                        return nil, nil, err
                }
 
-               singleMessages, err := decodeBatchPayload(payloads, numMsg)
-               if err != nil {
-                       return nil, nil, err
+               singleMessages, e := decodeBatchPayload(payloads, numMsg)
+               if e != nil {
+                       return nil, nil, e
                }
 
                payloadList = make([][]byte, 0, numMsg)
@@ -185,7 +185,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta 
*pb.MessageMetadata, payloa
                        payloadList = append(payloadList, 
singleMsg.SinglePayload)
                }
 
-               if err := computeChecksum(chksum, expectedChksum); err != nil {
+               if err = computeChecksum(chksum, expectedChksum); err != nil {
                        return nil, nil, err
                }
                return msgMeta, payloadList, nil
@@ -207,7 +207,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta 
*pb.MessageMetadata, payloa
                payloadList = append(payloadList, payload)
        }
 
-       if err := computeChecksum(chksum, expectedChksum); err != nil {
+       if err = computeChecksum(chksum, expectedChksum); err != nil {
                return nil, nil, err
        }
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 0d7d3d8..2886a7c 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -404,15 +404,15 @@ func (c *connection) handleSendReceipt(response 
*pb.CommandSendReceipt) {
 
 func (c *connection) handleMessage(response *pb.CommandMessage, payload 
[]byte) error {
        c.log.Debug("Got Message: ", response)
-       consumerId := response.GetConsumerId()
-       if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+       consumerID := response.GetConsumerId()
+       if consumer, ok := c.connWrapper.Consumers[consumerID]; ok {
                err := consumer.MessageReceived(response, payload)
                if err != nil {
-                       c.log.WithField("consumerId", consumerId).Error("handle 
message err: ", response.MessageId)
+                       c.log.WithField("consumerID", consumerID).Error("handle 
message err: ", response.MessageId)
                        return errors.New("handler not found")
                }
        } else {
-               c.log.WithField("consumerId", consumerId).Warn("Got unexpected 
message: ", response.MessageId)
+               c.log.WithField("consumerID", consumerID).Warn("Got unexpected 
message: ", response.MessageId)
        }
        return nil
 }
diff --git a/pulsar/internal/hash_test.go b/pulsar/internal/hash_test.go
index 2f9bc19..5e103cb 100644
--- a/pulsar/internal/hash_test.go
+++ b/pulsar/internal/hash_test.go
@@ -42,7 +42,8 @@ var murmurHashValues = []testProvider{
 }
 
 func TestJavaHash(t *testing.T) {
-       for _, p := range javaHashValues {
+       for _, javaHashValue := range javaHashValues {
+               p := javaHashValue
                t.Run(p.str, func(t *testing.T) {
                        assert.Equal(t, p.hash, JavaStringHash(p.str))
                })
@@ -50,7 +51,8 @@ func TestJavaHash(t *testing.T) {
 }
 
 func TestMurmurHash(t *testing.T) {
-       for _, p := range murmurHashValues {
+       for _, murmurHashValue := range murmurHashValues {
+               p := murmurHashValue
                t.Run(p.str, func(t *testing.T) {
                        assert.Equal(t, p.hash, Murmur3_32Hash(p.str))
                })
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index de7f058..0c548bc 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -95,8 +95,8 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, 
requestID uint64, cmdType
        return nil, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, 
cmdType pb.BaseCommand_Type,
-               message proto.Message) (*RPCResult, error) {
+func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
        assert.Fail(c.t, "Shouldn't be called")
        return nil, nil
 }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 2b68a71..66e8a79 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
        Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
                cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
 
-       RequestOnCnxNoWait(cnx Connection, requestId uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+       RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 
        RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 }
@@ -112,13 +112,13 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, 
requestID uint64, cmdType pb.Ba
        return rpcResult, nil
 }
 
-func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestId uint64, 
cmdType pb.BaseCommand_Type,
-               message proto.Message) (*RPCResult, error) {
+func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
        rpcResult := &RPCResult{
                Cnx: cnx,
        }
 
-       cnx.SendRequest(requestId, baseCommand(cmdType, message), func(response 
*pb.BaseCommand) {
+       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand) {
                rpcResult.Response = response
        })
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index e9327d0..391c38f 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -169,7 +169,8 @@ func TestProducerCompression(t *testing.T) {
                {"zstd", ZSTD},
        }
 
-       for _, p := range providers {
+       for _, provider := range providers {
+               p := provider
                t.Run(p.name, func(t *testing.T) {
                        client, err := NewClient(ClientOptions{
                                URL: serviceURL,
@@ -290,6 +291,7 @@ func TestFlushInProducer(t *testing.T) {
                        "producer-id":   "test-producer-id",
                },
        })
+       assert.Nil(t, err)
        defer producer.Close()
 
        consumer, err := client.Subscribe(ConsumerOptions{
@@ -325,7 +327,7 @@ func TestFlushInProducer(t *testing.T) {
        wg.Wait()
 
        for i := 0; i < numOfMessages/2; i++ {
-               _, err := consumer.Receive(ctx)
+               _, err = consumer.Receive(ctx)
                assert.Nil(t, err)
                msgCount++
        }
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 1329be1..9930a99 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -57,8 +57,9 @@ func httpPut(url string, body interface{}) {
                "Content-Type": {"application/json"},
        }
 
-       _, err = client.Do(req)
+       resp, err := client.Do(req)
        if err != nil {
                log.Fatal(err)
        }
+       resp.Body.Close()
 }
diff --git a/pulsar/unackMsgTracker_test.go b/pulsar/unackMsgTracker_test.go
index d292fc8..edf7ddc 100644
--- a/pulsar/unackMsgTracker_test.go
+++ b/pulsar/unackMsgTracker_test.go
@@ -18,47 +18,48 @@
 package pulsar
 
 import (
-    `github.com/apache/pulsar-client-go/pkg/pb`
-    `github.com/golang/protobuf/proto`
-    `github.com/stretchr/testify/assert`
-    `testing`
+       "testing"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/golang/protobuf/proto"
+       "github.com/stretchr/testify/assert"
 )
 
 func TestUnackedMessageTracker(t *testing.T) {
-    unAckTracker := NewUnackedMessageTracker()
+       unAckTracker := NewUnackedMessageTracker()
 
-    var msgIDs []*pb.MessageIdData
+       var msgIDs []*pb.MessageIdData
 
-    for i := 0; i < 5; i++ {
-        msgID := &pb.MessageIdData{
-            LedgerId:   proto.Uint64(1),
-            EntryId:    proto.Uint64(uint64(i)),
-            Partition:  proto.Int32(-1),
-            BatchIndex: proto.Int32(-1),
-        }
+       for i := 0; i < 5; i++ {
+               msgID := &pb.MessageIdData{
+                       LedgerId:   proto.Uint64(1),
+                       EntryId:    proto.Uint64(uint64(i)),
+                       Partition:  proto.Int32(-1),
+                       BatchIndex: proto.Int32(-1),
+               }
 
-        msgIDs = append(msgIDs, msgID)
-    }
+               msgIDs = append(msgIDs, msgID)
+       }
 
-    for _, msgID := range msgIDs {
-        ok := unAckTracker.Add(msgID)
-        assert.True(t, ok)
-    }
+       for _, msgID := range msgIDs {
+               ok := unAckTracker.Add(msgID)
+               assert.True(t, ok)
+       }
 
-    flag := unAckTracker.IsEmpty()
-    assert.False(t, flag)
+       flag := unAckTracker.IsEmpty()
+       assert.False(t, flag)
 
-    num := unAckTracker.Size()
-    assert.Equal(t, num, 5)
+       num := unAckTracker.Size()
+       assert.Equal(t, num, 5)
 
-    for index, msgID := range msgIDs {
-        unAckTracker.Remove(msgID)
-        assert.Equal(t, 4-index, unAckTracker.Size())
-    }
+       for index, msgID := range msgIDs {
+               unAckTracker.Remove(msgID)
+               assert.Equal(t, 4-index, unAckTracker.Size())
+       }
 
-    num = unAckTracker.Size()
-    assert.Equal(t, num, 0)
+       num = unAckTracker.Size()
+       assert.Equal(t, num, 0)
 
-    flag = unAckTracker.IsEmpty()
-    assert.True(t, flag)
+       flag = unAckTracker.IsEmpty()
+       assert.True(t, flag)
 }
diff --git a/util/error.go b/util/error.go
index 6a051f5..755b7c0 100644
--- a/util/error.go
+++ b/util/error.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,36 +14,36 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package util
 
 import (
-    `fmt`
-    `github.com/apache/pulsar-client-go/pkg/pb`
+       "fmt"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
 )
 
 // NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
 // Optionally provide a list of IDs associated with the message
 // for additional context in the error message.
 func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) 
*UnexpectedErrMsg {
-    return &UnexpectedErrMsg{
-        msgType: msgType,
-        ids:     ids,
-    }
+       return &UnexpectedErrMsg{
+               msgType: msgType,
+               ids:     ids,
+       }
 }
 
 // UnexpectedErrMsg is returned when an unexpected message is received.
 type UnexpectedErrMsg struct {
-    msgType pb.BaseCommand_Type
-    ids     []interface{}
+       msgType pb.BaseCommand_Type
+       ids     []interface{}
 }
 
 // Error satisfies the error interface.
 func (e *UnexpectedErrMsg) Error() string {
-    msg := fmt.Sprintf("received unexpected message of type %q", 
e.msgType.String())
-    for _, id := range e.ids {
-        msg += fmt.Sprintf(" id=%v", id)
-    }
-    return msg
+       msg := fmt.Sprintf("received unexpected message of type %q", 
e.msgType.String())
+       for _, id := range e.ids {
+               msg += fmt.Sprintf(" id=%v", id)
+       }
+       return msg
 }
diff --git a/util/util_test.go b/util/util_test.go
index 284dd0c..3b0a9f9 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -19,13 +19,14 @@ package util
 
 import (
        "fmt"
-       "github.com/stretchr/testify/assert"
        "strings"
        "testing"
+
+       "github.com/stretchr/testify/assert"
 )
 
 func TestIsNil(t *testing.T) {
-       var a interface{} = nil
+       var a interface{}
        var b interface{} = (*int)(nil)
 
        assert.True(t, a == nil)
@@ -35,6 +36,6 @@ func TestIsNil(t *testing.T) {
 func TestRemoveDuplicateElement(t *testing.T) {
        s := []string{"hello", "world", "hello", "golang", "hello", "ruby", 
"php", "java"}
        resList := RemoveDuplicateElement(s)
-       res := fmt.Sprintf("%s", resList[:])
+       res := fmt.Sprintf("%s", resList)
        assert.Equal(t, 1, strings.Count(res, "hello"))
 }

Reply via email to