merlimat closed pull request #1251: Fix lookup problem with partions in a non-persistent topics URL: https://github.com/apache/incubator-pulsar/pull/1251
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b459011d0..5d83f8dda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1094,8 +1094,8 @@ protected void internalExpireMessages(String subName, int expireTimeInSeconds, b throw ex; } - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), - "persistent", dn.getEncodedLocalName()); + String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), dn.getDomain().toString(), + dn.getEncodedLocalName()); // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index cebce5783..5f45bc4c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -186,6 +186,56 @@ public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exce } + @Test(dataProvider = "subscriptionType") + public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception { + log.info("-- Starting {} test --", methodName); + + final int numPartitions = 5; + final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic"; + admin.nonPersistentTopics().createPartitionedTopic(topic, numPartitions); + + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT) + .statsInterval(0, TimeUnit.SECONDS).build(); + Consumer consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1").subscriptionType(type) + .subscribe(); + + Producer producer = client.newProducer().topic(topic).create(); + + // Ensure all partitions exist + for (int i = 0; i < numPartitions; i++) { + DestinationName partition = DestinationName.get(topic).getPartition(i); + assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString())); + } + + int totalProduceMsg = 500; + for (int i = 0; i < totalProduceMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + + Message msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < totalProduceMsg; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + consumer.acknowledge(msg); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } else { + break; + } + } + assertEquals(messageSet.size(), totalProduceMsg); + + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + client.close(); + } + /** * It verifies that broker doesn't dispatch messages if consumer runs out of permits * filled out with messages ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services