stevenwangnarvar opened a new issue #5235: lost message when not ack in partitioned topics in go client URL: https://github.com/apache/pulsar/issues/5235 **Describe the bug** try to test this fix https://github.com/apache/pulsar/pull/4653, but found have message lost during re-process messages which are not acknowledgment. **To Reproduce** Steps to reproduce the behavior: 1. Create a partitioned topic 2. run a test go program, send several messages, and receive them but not ack them. 3. Check the received message, check if any messages are lost. **Expected behavior** No message should lost. **Screenshots** <img width="979" alt="Screen Shot 2019-09-20 at 1 25 10 PM" src="https://user-images.githubusercontent.com/49885838/65297627-b2b43580-dbbc-11e9-9739-531696a82580.png"> <img width="798" alt="Screen Shot 2019-09-20 at 1 24 59 PM" src="https://user-images.githubusercontent.com/49885838/65297632-b8aa1680-dbbc-11e9-8e31-eaf151596201.png"> **Desktop (please complete the following information):** - OS: Mac **Additional context** Queue info: ``` /pulsar-admin topics create-partitioned-topic --partitions 4 persistent://dev/genericfileproc/npulsar_test_output_topic ``` Test code ``` package main import ( "context" "fmt" "log" "time" "github.com/apache/pulsar/pulsar-client-go/pulsar" ) func main() { topic := "persistent://dev/genericfileproc/npulsar_test_output_topic" // Instantiate a Pulsar client client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } // Use the client to instantiate a producer producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topic, }) if err != nil { log.Fatal(err) } // Use the client object to instantiate a consumer consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: topic, SubscriptionName: "gfp-input-carrier-flat-file", Type: pulsar.Shared, AckTimeout: 15 * time.Second, }) ctx := context.Background() // Send 5 messages synchronously for i := 0; i < 5; i++ { // Create a message msg := pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("message-%d", i)), } // Attempt to send the message if err := producer.Send(ctx, msg); err != nil { log.Fatal(err) } } // Listen indefinitely on the topic for { msg, err := consumer.Receive(ctx) if err != nil { log.Fatal(err) } // Do something with the message fmt.Println("" + time.Now().Format(time.RFC850) + " consume " + string(msg.Payload())) //consumer.Ack(msg) } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services