sijie closed pull request #2320: Fix flaky test PersistentTopicE2ETest.testMessageRedelivery URL: https://github.com/apache/incubator-pulsar/pull/2320
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index a5beefaceb..ee9fb74c5c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1211,30 +1211,33 @@ public void testPayloadCorruptionDetection() throws Exception { * 1. produce messages 2. consume messages and ack all except 1 msg 3. Verification: should replay only 1 unacked * message */ - @Test() + @Test public void testMessageRedelivery() throws Exception { final String topicName = "persistent://prop/ns-abc/topic2"; final String subName = "sub2"; - Message<byte[]> msg; + Message<String> msg; int totalMessages = 10; - Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer() - .topic(topicName) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (1) Produce messages for (int i = 0; i < totalMessages; i++) { - String message = "my-message-" + i; - producer.send(message.getBytes()); + producer.send("my-message-" + i); } // (2) Consume and ack messages except first message - Message<byte[]> unAckedMsg = null; + Message<String> unAckedMsg = null; for (int i = 0; i < totalMessages; i++) { msg = consumer.receive(); if (i == 0) { @@ -1249,7 +1252,7 @@ public void testMessageRedelivery() throws Exception { // Verify: msg [L:0] must be redelivered try { msg = consumer.receive(1, TimeUnit.SECONDS); - assertEquals(new String(msg.getData()), new String(unAckedMsg.getData())); + assertEquals(msg.getValue(), unAckedMsg.getValue()); } catch (Exception e) { fail("msg should be redelivered ", e); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services