This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0cc5e81  Issue #1452: remove reachedEndOfTopic in addConsumer (#2301)
0cc5e81 is described below

commit 0cc5e81fcc5ac3859ee463556e6c1ed7e05d29a3
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Mon Aug 6 07:49:42 2018 +0800

    Issue #1452: remove reachedEndOfTopic in addConsumer (#2301)
    
    ### Motivation
    
    Fixes #1452
    
    In issue #1452 , reachedEndOfTopic was called twice if a topic has been 
terminated before subscription.
    It may be better to call `reachedEndOfTopic`, when real read/ack happened 
to the subscription, so delete the calling in `addConsumer` to avoid dup 
calling.
    
    ### Modifications
    
    remove dup calling in `addConsumer` .
    add related ut.
    
    ### Result
    
    Expected all ut passed.
---
 .../service/persistent/PersistentSubscription.java |  5 ---
 .../client/api/SimpleProducerConsumerTest.java     | 42 ++++++++++++++++++++--
 2 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8a77bfa..0842742 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -103,11 +103,6 @@ public class PersistentSubscription implements 
Subscription {
             throw new SubscriptionFencedException("Subscription is fenced");
         }
 
-        if (topic.getManagedLedger().isTerminated() && 
cursor.getNumberOfEntriesInBacklog() == 0) {
-            // Immediately notify the consumer that there are no more 
available messages
-            consumer.reachedEndOfTopic();
-        }
-
         if (dispatcher == null || !dispatcher.isConsumerConnected()) {
             switch (consumer.subType()) {
             case Exclusive:
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 79166a1..61bdad0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2434,7 +2434,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
                 .cryptoKeyReader(new EncKeyReader()).create();
-        
+
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1")
                 
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
                 .subscribe();
@@ -2450,7 +2450,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
-    
+
     private String decryptMessage(TopicMessageImpl<byte[]> msg, String 
encryptionKeyName, CryptoKeyReader reader)
             throws Exception {
         Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
@@ -2624,4 +2624,42 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    // Issue 1452: https://github.com/apache/incubator-pulsar/issues/1452
+    // reachedEndOfTopic should be called only once if a topic has been 
terminated before subscription
+    @Test
+    public void testReachedEndOfTopic() throws Exception
+    {
+        String topicName = 
"persistent://my-property/my-ns/testReachedEndOfTopic";
+        Producer producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false).create();
+        producer.close();
+
+        admin.topics().terminateTopicAsync(topicName).get();
+
+        CountDownLatch latch = new CountDownLatch(2);
+        Consumer consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName("my-subscriber-name")
+            .messageListener(new MessageListener()
+            {
+                @Override
+                public void reachedEndOfTopic(Consumer consumer)
+                {
+                    log.info("called reachedEndOfTopic  {}", methodName);
+                    latch.countDown();
+                }
+
+                @Override
+                public void received(Consumer consumer, Message message)
+                {
+                    // do nothing
+                }
+            })
+            .subscribe();
+
+        assertFalse(latch.await(1, TimeUnit.SECONDS));
+        assertEquals(latch.getCount(), 1);
+        consumer.close();
+    }
 }

Reply via email to