merlimat closed pull request #1251: Fix lookup problem with partions in a 
non-persistent topics
URL: https://github.com/apache/incubator-pulsar/pull/1251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b459011d0..5d83f8dda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1094,8 +1094,8 @@ protected void internalExpireMessages(String subName, int 
expireTimeInSeconds, b
                 throw ex;
             }
 
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(),
-                    "persistent", dn.getEncodedLocalName());
+            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
dn.getNamespace(), dn.getDomain().toString(),
+                    dn.getEncodedLocalName());
 
             // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
             // serve/redirect request else fail partitioned-metadata-request 
so, client fails while creating
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index cebce5783..5f45bc4c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -186,6 +186,56 @@ public void 
testPartitionedNonPersistentTopic(SubscriptionType type) throws Exce
 
     }
 
+    @Test(dataProvider = "subscriptionType")
+    public void 
testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int numPartitions = 5;
+        final String topic = 
"non-persistent://my-property/use/my-ns/partitioned-topic";
+        admin.nonPersistentTopics().createPartitionedTopic(topic, 
numPartitions);
+
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT)
+                .statsInterval(0, TimeUnit.SECONDS).build();
+        Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("subscriber-1").subscriptionType(type)
+                .subscribe();
+
+        Producer producer = client.newProducer().topic(topic).create();
+
+        // Ensure all partitions exist
+        for (int i = 0; i < numPartitions; i++) {
+            DestinationName partition = 
DestinationName.get(topic).getPartition(i);
+            
assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString()));
+        }
+
+        int totalProduceMsg = 500;
+        for (int i = 0; i < totalProduceMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            Thread.sleep(10);
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < totalProduceMsg; i++) {
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                String receivedMessage = new String(msg.getData());
+                log.debug("Received message: [{}]", receivedMessage);
+                String expectedMessage = "my-message-" + i;
+                testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            } else {
+                break;
+            }
+        }
+        assertEquals(messageSet.size(), totalProduceMsg);
+
+        producer.close();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+        client.close();
+    }
+
     /**
      * It verifies that broker doesn't dispatch messages if consumer runs out 
of permits
      * filled out with messages


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to