stevenwangnarvar opened a new issue #5235: lost message when not ack in 
partitioned topics in go client
URL: https://github.com/apache/pulsar/issues/5235
 
 
   **Describe the bug**
   try to test this fix https://github.com/apache/pulsar/pull/4653, but found 
have message lost during re-process messages which are not acknowledgment.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a partitioned topic
   2. run a test go program, send several messages, and receive them but not 
ack them.
   3. Check the received message, check if any messages are lost.
   
   **Expected behavior**
   No message should lost.
   
   **Screenshots**
   <img width="979" alt="Screen Shot 2019-09-20 at 1 25 10 PM" 
src="https://user-images.githubusercontent.com/49885838/65297627-b2b43580-dbbc-11e9-9739-531696a82580.png";>
   <img width="798" alt="Screen Shot 2019-09-20 at 1 24 59 PM" 
src="https://user-images.githubusercontent.com/49885838/65297632-b8aa1680-dbbc-11e9-8e31-eaf151596201.png";>
   
   
   **Desktop (please complete the following information):**
    - OS: Mac
   
   **Additional context**
   Queue info:
   ```
   /pulsar-admin topics create-partitioned-topic --partitions 4 
persistent://dev/genericfileproc/npulsar_test_output_topic
   ```
   Test code 
   ```
   package main
   
   import (
        "context"
        "fmt"
        "log"
        "time"
   
        "github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
        topic := "persistent://dev/genericfileproc/npulsar_test_output_topic"
   
        // Instantiate a Pulsar client
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
        })
   
        if err != nil {
                log.Fatal(err)
        }
   
        // Use the client to instantiate a producer
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
                Topic: topic,
        })
   
        if err != nil {
                log.Fatal(err)
        }
   
        // Use the client object to instantiate a consumer
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            topic,
                SubscriptionName: "gfp-input-carrier-flat-file",
                Type:             pulsar.Shared,
                AckTimeout:       15 * time.Second,
        })
   
        ctx := context.Background()
        // Send 5 messages synchronously
        for i := 0; i < 5; i++ {
                // Create a message
                msg := pulsar.ProducerMessage{
                        Payload: []byte(fmt.Sprintf("message-%d", i)),
                }
                // Attempt to send the message
                if err := producer.Send(ctx, msg); err != nil {
                        log.Fatal(err)
                }
        }
   
        // Listen indefinitely on the topic
        for {
                msg, err := consumer.Receive(ctx)
                if err != nil {
                        log.Fatal(err)
                }
                // Do something with the message
                fmt.Println("" + time.Now().Format(time.RFC850) + " consume " + 
string(msg.Payload()))
                //consumer.Ack(msg)
        }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to