This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dfb8fd  Add producer check state before send msg. (#569)
1dfb8fd is described below

commit 1dfb8fdffb97d0c4f4845cc668bed485324daf5c
Author: Zhiqiang Li <stu...@qq.com>
AuthorDate: Wed Jul 21 17:43:23 2021 +0800

    Add producer check state before send msg. (#569)
    
    Add producer state check before send msg.
---
 pulsar/error.go              |  4 ++++
 pulsar/producer_partition.go |  7 +++++++
 pulsar/producer_test.go      | 30 ++++++++++++++++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git a/pulsar/error.go b/pulsar/error.go
index 60a832b..f433bfc 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -99,6 +99,8 @@ const (
        AddToBatchFailed
        // SeekFailed seek failed
        SeekFailed
+       // ProducerClosed means producer already been closed
+       ProducerClosed
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -201,6 +203,8 @@ func getResultStr(r Result) string {
                return "AddToBatchFailed"
        case SeekFailed:
                return "SeekFailed"
+       case ProducerClosed:
+               return "ProducerClosed"
        default:
                return fmt.Sprintf("Result(%d)", r)
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8b3d33d..7e83bfa 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -50,6 +50,7 @@ var (
        errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue 
is full")
        errContextExpired  = newError(TimeoutError, "message send context 
expired")
        errMessageTooLarge = newError(MessageTooBig, "message size exceeds 
MaxMessageSize")
+       errProducerClosed  = newError(ProducerClosed, "producer already been 
closed")
 
        buffersPool sync.Pool
 )
@@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, 
msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+       if p.getProducerState() != producerReady {
+               // Producer is closing
+               callback(nil, msg, errProducerClosed)
+               return
+       }
+
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3dbd7..bbe8028 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) {
        assert.Equal(t, 10, metric.sendn)
        assert.Equal(t, 10, metric.ackn)
 }
+
+func TestProducerSendAfterClose(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: newTopicName(),
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, ID)
+
+       producer.Close()
+       ID, err = producer.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+       })
+       assert.Nil(t, ID)
+       assert.Error(t, err)
+}

Reply via email to