[incubator-pulsar] branch asf-site updated: Updated site at revision d073a7c
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/asf-site by this push: new 2b78809 Updated site at revision d073a7c 2b78809 is described below commit 2b78809cb8e814679af67c38c50e7b5908033f8f Author: jenkins AuthorDate: Thu Feb 22 08:12:00 2018 + Updated site at revision d073a7c --- content/api/client/index-all.html | 18 +++ .../org/apache/pulsar/client/api/PulsarClient.html | 126 - 2 files changed, 139 insertions(+), 5 deletions(-) diff --git a/content/api/client/index-all.html b/content/api/client/index-all.html index e1f035a..1760ef3 100644 --- a/content/api/client/index-all.html +++ b/content/api/client/index-all.html @@ -1597,6 +1597,14 @@ Use PulsarClient.newConsumer() to build a new consumer +subscribe(Collection, String) - Method in interface org.apache.pulsar.client.api.PulsarClient + +Subscribe to the given topic and subscription combination with default ConsumerConfiguration + +subscribe(Collection , String, ConsumerConfiguration) - Method in interface org.apache.pulsar.client.api.PulsarClient + +Subscribe to the given topics and subscription combination using given ConsumerConfiguration + subscribeAsync() - Method in interface org.apache.pulsar.client.api.ConsumerBuilder Finalize the Consumer creation by subscribing to the topic in asynchronous mode. @@ -1613,6 +1621,16 @@ Use PulsarClient.newConsumer() to build a new consumer +subscribeAsync(Collection , String) - Method in interface org.apache.pulsar.client.api.PulsarClient + +Asynchronously subscribe to the given topics and subscription combination with + default ConsumerConfiguration + +subscribeAsync(Collection , String, ConsumerConfiguration) - Method in interface org.apache.pulsar.client.api.PulsarClient + +Asynchronously subscribe to the given topics and subscription combination using given + ConsumerConfiguration + subscriptionName(String) - Method in interface org.apache.pulsar.client.api.ConsumerBuilder Specify the subscription name for this consumer. diff --git a/content/api/client/org/apache/pulsar/client/api/PulsarClient.html b/content/api/client/org/apache/pulsar/client/api/PulsarClient.html index 0d6a841..86c6a10 100644 --- a/content/api/client/org/apache/pulsar/client/api/PulsarClient.html +++ b/content/api/client/org/apache/pulsar/client/api/PulsarClient.html @@ -17,7 +17,7 @@ catch(err) { } //--> -var methods = {"i0":17,"i1":6,"i2":6,"i3":49,"i4":49,"i5":38,"i6":38,"i7":38,"i8":38,"i9":38,"i10":38,"i11":6,"i12":6,"i13":6,"i14":6,"i15":38,"i16":38,"i17":38,"i18":38}; +var methods = {"i0":17,"i1":6,"i2":6,"i3":49,"i4":49,"i5":38,"i6":38,"i7":38,"i8":38,"i9":38,"i10":38,"i11":6,"i12":6,"i13":6,"i14":6,"i15":6,"i16":6,"i17":38,"i18":38,"i19":6,"i20":6,"i21":38,"i22":38}; var tabs = {65535:["t0","All Methods"],1:["t1","Static Methods"],2:["t2","Instance Methods"],4:["t3","Abstract Methods"],16:["t5","Default Methods"],32:["t6","Deprecated Methods"]}; var altColor = "altColor"; var rowColor = "rowColor"; @@ -242,6 +242,21 @@ extends Closeable Consumer +subscribe(Collection topics, + String subscription) +Subscribe to the given topic and subscription combination with default ConsumerConfiguration + + + +Consumer +subscribe(Collection topics, + String subscription, + ConsumerConfiguration conf) +Subscribe to the given topics and subscription combination using given ConsumerConfiguration + + + +Consumer subscribe(String topic, String subscription) Deprecated. @@ -249,7 +264,7 @@ extends Closeable - + Consumer subscribe(String topic, String subscription, @@ -259,7 +274,24 @@ extends Closeable - + +CompletableFuture +subscribeAsync(Collection topics, + String subscription) +Asynchronously subscribe to the given topics and subscription combination with + default ConsumerConfiguration + + + +CompletableFuture +subscribeAsync(Collection topics, + String subscription, + ConsumerConfiguration conf) +Asynchronously subscribe to the given topics and subscription combination using given + ConsumerConfiguration + + + CompletableFuture subscribeAsync(String topic, String subscription) @@ -268,7 +300,7 @@ extends Closeable - + CompletableFuture subscribeAsync(String topic, String subscription, @@ -706,7 +738,7 @@ CompletableFuture< - + shutdown void shutdown() @@ -720,6 +752,90 @@ CompletableFuture< + + + + +subscribe +Consumer subscribe(Collection topics, + String subscription) +throws PulsarClientException +Subscribe to the given topic and subscription combination
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898353 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -148,20 +148,23 @@ public void testSyncProducerAndConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); -ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect -Producer producer1 = pulsarClient.createProducer(topicName1); -Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); -Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); +Producer producer1 = pulsarClient.newProducer().topic(topicName1) +.create(); +Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); +Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) Review comment: `receiverQueueSize(4)`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898450 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -205,20 +208,23 @@ public void testAsyncConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); -ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect -Producer producer1 = pulsarClient.createProducer(topicName1); -Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); -Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); +Producer producer1 = pulsarClient.newProducer().topic(topicName1) +.create(); +Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); +Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) Review comment: `receiverQueueSize(4)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169899089 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java ## @@ -77,22 +81,51 @@ public Consumer subscribe() throws PulsarClientException { @Override public CompletableFuture subscribeAsync() { -if (topicName == null) { +if (topicNames == null || topicNames.isEmpty()) { return FutureUtil -.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); +.failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder")); } if (subscriptionName == null) { return FutureUtil.failedFuture( -new IllegalArgumentException("Subscription name must be set on the producer builder")); +new IllegalArgumentException("Subscription name must be set on the consumer builder")); } -return client.subscribeAsync(topicName, subscriptionName, conf); +if (topicNames.size() == 1) { +return client.subscribeAsync(topicNames.get(0), subscriptionName, conf); +} else { +return client.subscribeAsync(topicNames, subscriptionName, conf); +} } @Override -public ConsumerBuilder topic(String topicName) { -this.topicName = topicName; +public ConsumerBuilder topic(String... topicNames) { +checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty."); +if (this.topicNames == null) { +this.topicNames = Lists.newArrayList(); +} + +for (int i = 0; i < topicNames.length; i++) { +if (!this.topicNames.contains(topicNames[i])) { +this.topicNames.add(topicNames[i]); +} +} +return this; +} + +@Override +public ConsumerBuilder topics(List topicNames) { +checkArgument(topicNames != null && !topicNames.isEmpty(), +"Passed in topicNames list should not be empty."); +if (this.topicNames == null) { +this.topicNames = topicNames; +} else { +topicNames.forEach(name -> { Review comment: this.topicNames.addAll(topicNames) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898223 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -103,18 +101,20 @@ public void testGetConsumersAndGetTopics() throws Exception { final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; -List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); +List topicNames = Lists.newArrayList(topicName1, topicName2); admin.properties().createProperty("prop", new PropertyAdmin()); admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.topic(topicName3) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) Review comment: you need configure `receiverQueueSize(4)` no? there was `setReceiverQueueSize(4)` at ConsumerConfiguration? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898053 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -82,12 +79,13 @@ public void testDifferentTopicsNameSubscribe() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); try { -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) Review comment: you need configure `receiverQueueSize(4)` no? there was `setReceiverQueueSize(4)` at ConsumerConfiguration? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898906 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java ## @@ -40,9 +44,9 @@ private static final long serialVersionUID = 1L; private final PulsarClientImpl client; -private String topicName; private String subscriptionName; private final ConsumerConfiguration conf; +private List topicNames; Review comment: I would suggest using Set instead of List, so this allows us removing duplicated topic names. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer
sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898511 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -280,20 +286,23 @@ public void testConsumerUnackedRedelivery() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); -ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect -Producer producer1 = pulsarClient.createProducer(topicName1); -Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); -Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); +Producer producer1 = pulsarClient.newProducer().topic(topicName1) +.create(); +Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); +Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) +.create(); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) Review comment: `receiverQueueSize(4)`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1269: Issue #1237: support builder for topicsConsumer
zhaijack commented on issue #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269#issuecomment-367628188 @sijie Thanks, have changed it. receiverQueueSize was added to test some functionality. added it back here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1259: Is there any relationship between Yahoo Message Broker and Pulsar projects?
merlimat commented on issue #1259: Is there any relationship between Yahoo Message Broker and Pulsar projects? URL: https://github.com/apache/incubator-pulsar/issues/1259#issuecomment-367759969 > Am I right to choose Pulsar as the right option to realize the application? That is exactly the same scenario as Sherpa use case, so I would definitely say that it is a good match. We literally designed for that use case :) > would you give me some advice? thanks a lot! Sure, shoot up any question/doubts either here, in the mailing lists or in the Slack channel. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia closed pull request #1268: Add get-peer clusters admin api
rdhabalia closed pull request #1268: Add get-peer clusters admin api URL: https://github.com/apache/incubator-pulsar/pull/1268 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 733105a37..de670c136 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -213,6 +213,28 @@ public void setPeerClusterNames(@PathParam("cluster") String cluster, LinkedHash } } + @GET + @Path("/{cluster}/peers") + @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = Set.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public Set getPeerCluster(@PathParam("cluster") String cluster) { + validateSuperUserAccess(); + + try { + String clusterPath = path("clusters", cluster); + byte[] content = globalZk().getData(clusterPath, null, null); + ClusterData clusterData = jsonMapper().readValue(content, ClusterData.class); + return clusterData.getPeerClusterNames(); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), cluster); + throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + } + @DELETE @Path("/{cluster}") @ApiOperation(value = "Delete an existing cluster") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index c344af55a..d5456fcca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -151,6 +154,22 @@ public void testPeerClusterTopicLookup(String protocol) throws Exception { } + @Test + public void testGetPeerClusters() throws Exception { + final String mainClusterName = "r1"; + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null); + LinkedHashSet peerClusters = Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3")); + admin1.clusters().updatePeerClusterNames(mainClusterName, peerClusters); + retryStrategically((test) -> { + try { + return admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 100); + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), peerClusters); + } + private static final Logger log = LoggerFactory.getLogger(PeerReplicatorTest.class); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 779e7b117..4048367de 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -21,6 +21,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.pulsar.client.a
[incubator-pulsar] branch master updated: Add get-peer clusters admin api (#1268)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7b07fce Add get-peer clusters admin api (#1268) 7b07fce is described below commit 7b07fce3ccf633f705b5bbe1de3248c5abdcac10 Author: Rajan Dhabalia AuthorDate: Thu Feb 22 11:06:19 2018 -0800 Add get-peer clusters admin api (#1268) --- .../pulsar/broker/admin/impl/ClustersBase.java | 22 + .../pulsar/broker/service/PeerReplicatorTest.java | 19 ++ .../org/apache/pulsar/client/admin/Clusters.java | 23 ++ .../pulsar/client/admin/internal/ClustersImpl.java | 10 ++ .../org/apache/pulsar/admin/cli/CmdClusters.java | 14 + .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +++ 6 files changed, 91 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 733105a..de670c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -213,6 +213,28 @@ public class ClustersBase extends AdminResource { } } + @GET + @Path("/{cluster}/peers") + @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = Set.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public Set getPeerCluster(@PathParam("cluster") String cluster) { + validateSuperUserAccess(); + + try { + String clusterPath = path("clusters", cluster); + byte[] content = globalZk().getData(clusterPath, null, null); + ClusterData clusterData = jsonMapper().readValue(content, ClusterData.class); + return clusterData.getPeerClusterNames(); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), cluster); + throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + } + @DELETE @Path("/{cluster}") @ApiOperation(value = "Delete an existing cluster") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index c344af5..d5456fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -151,6 +154,22 @@ public class PeerReplicatorTest extends ReplicatorTestBase { } + @Test + public void testGetPeerClusters() throws Exception { + final String mainClusterName = "r1"; + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null); + LinkedHashSet peerClusters = Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3")); + admin1.clusters().updatePeerClusterNames(mainClusterName, peerClusters); + retryStrategically((test) -> { + try { + return admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 100); + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), peerClusters); + } + private static final Logger log = LoggerFactory.getLogger(PeerReplicatorTest.class); } diff --git a/pulsar-
[GitHub] merlimat opened a new pull request #1270: Added ManagedLedger perf tool
merlimat opened a new pull request #1270: Added ManagedLedger perf tool URL: https://github.com/apache/incubator-pulsar/pull/1270 ### Motivation Added tool to do performance tests directly on BookKeeper, using the ManagedLedger API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1269: Issue #1237: support builder for topicsConsumer
merlimat closed pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index a2cb6d579..2aa386e71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,12 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -82,12 +79,13 @@ public void testDifferentTopicsNameSubscribe() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); try { -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) +.subscribe(); fail("subscribe for topics from different namespace should fail."); } catch (IllegalArgumentException e) { // expected for have different namespace @@ -103,18 +101,21 @@ public void testGetConsumersAndGetTopics() throws Exception { final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; -List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); +List topicNames = Lists.newArrayList(topicName1, topicName2); admin.properties().createProperty("prop", new PropertyAdmin()); admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.topic(topicName3) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) +.receiverQueueSize(4) +.subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); List topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics(); @@ -148,20 +149,24 @@ public void testSyncProducerAndConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); -ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect -Producer producer1 = pulsarClient.createProducer(topicName1); -Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); -Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); +Producer producer1 = pulsarClient.newProducer().topic(topicName1) +.create(); +Producer producer2 = pulsarClient.newProducer().topic(topicName2) +
[incubator-pulsar] branch master updated: Issue #1237: support builder for topicsConsumer (#1269)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 68cd115 Issue #1237: support builder for topicsConsumer (#1269) 68cd115 is described below commit 68cd1154e9581bb85032da5a8e1d44336b25 Author: Jia Zhai AuthorDate: Thu Feb 22 11:38:59 2018 -0800 Issue #1237: support builder for topicsConsumer (#1269) * support builder for topicsConsumer * change following @Matteo's comments to remove subscribe methods in client * change to topic(String ... names) for consumer builder * change following @sijie's comments --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 197 +++-- .../apache/pulsar/client/api/ConsumerBuilder.java | 16 +- .../org/apache/pulsar/client/api/PulsarClient.java | 57 -- .../pulsar/client/impl/ConsumerBuilderImpl.java| 41 - .../pulsar/client/impl/PulsarClientImpl.java | 32 5 files changed, 186 insertions(+), 157 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index a2cb6d5..2aa386e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,12 +33,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -82,12 +79,13 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); try { -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) +.subscribe(); fail("subscribe for topics from different namespace should fail."); } catch (IllegalArgumentException e) { // expected for have different namespace @@ -103,18 +101,21 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; -List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); +List topicNames = Lists.newArrayList(topicName1, topicName2); admin.properties().createProperty("prop", new PropertyAdmin()); admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer -ConsumerConfiguration conf = new ConsumerConfiguration(); -conf.setReceiverQueueSize(4); -conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); -conf.setSubscriptionType(SubscriptionType.Shared); -Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.topic(topicName3) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) +.receiverQueueSize(4) +.subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); List topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics(); @@ -148,20 +149,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTo
[GitHub] lucperkins opened a new pull request #1271: Message deduplication documentation (WIP)
lucperkins opened a new pull request #1271: Message deduplication documentation (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1271 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1165: PIP-13-2/3: support regex based subscription
zhaijack commented on issue #1165: PIP-13-2/3: support regex based subscription URL: https://github.com/apache/incubator-pulsar/pull/1165#issuecomment-367842452 would like to close this one and open a new PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Broker should not start replicator for root partitioned-topic (#1262)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f38a003 Broker should not start replicator for root partitioned-topic (#1262) f38a003 is described below commit f38a003a2a8331da94996fc7ea871abfc779ae24 Author: Rajan Dhabalia AuthorDate: Thu Feb 22 18:30:58 2018 -0800 Broker should not start replicator for root partitioned-topic (#1262) * Broker should not start replicator for root partitioned-topic * address comment --- .../pulsar/broker/service/AbstractReplicator.java | 51 +-- .../pulsar/broker/service/BrokerService.java | 53 +--- .../nonpersistent/NonPersistentReplicator.java | 3 +- .../service/nonpersistent/NonPersistentTopic.java | 35 +-- .../service/persistent/PersistentReplicator.java | 3 +- .../broker/service/persistent/PersistentTopic.java | 39 ++-- .../pulsar/broker/service/ReplicatorTest.java | 72 ++ .../pulsar/broker/service/ReplicatorTestBase.java | 3 + 8 files changed, 220 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 49213c9..4642a85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,17 +18,21 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.AbstractReplicator.State; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.DestinationName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +61,9 @@ public abstract class AbstractReplicator { Stopped, Starting, Started, Stopping } -public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, -String remoteCluster, BrokerService brokerService) { +public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, +BrokerService brokerService) throws NamingException { +validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; @@ -69,8 +74,7 @@ public abstract class AbstractReplicator { this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerBuilder = client.newProducer() // -.topic(topicName) -.sendTimeout(0, TimeUnit.SECONDS) // +.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); @@ -211,5 +215,42 @@ public abstract class AbstractReplicator { return (replicatorPrefix + "." + cluster).intern(); } +/** + * Replication can't be started on root-partitioned-topic to avoid producer startup conflict. + * + * + * eg: + * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then + * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2". + * + * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual + * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing + * replicator producers. + * + * + * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned + * producers. + * + * @param topicName + * @param brokerService + */ +private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException { +DestinationName d
[GitHub] rdhabalia closed pull request #1262: Broker should not start replicator for root partitioned-topic
rdhabalia closed pull request #1262: Broker should not start replicator for root partitioned-topic URL: https://github.com/apache/incubator-pulsar/pull/1262 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 49213c90d..4642a8503 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,17 +18,21 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.AbstractReplicator.State; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.DestinationName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +61,9 @@ Stopped, Starting, Started, Stopping } -public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, -String remoteCluster, BrokerService brokerService) { +public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, +BrokerService brokerService) throws NamingException { +validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; @@ -69,8 +74,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerBuilder = client.newProducer() // -.topic(topicName) -.sendTimeout(0, TimeUnit.SECONDS) // +.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); @@ -211,5 +215,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster) return (replicatorPrefix + "." + cluster).intern(); } +/** + * Replication can't be started on root-partitioned-topic to avoid producer startup conflict. + * + * + * eg: + * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then + * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2". + * + * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual + * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing + * replicator producers. + * + * + * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned + * producers. + * + * @param topicName + * @param brokerService + */ +private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException { +DestinationName destination = DestinationName.get(topicName); +String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, +destination.getNamespace().toString(), destination.getDomain().toString(), +destination.getEncodedLocalName()); +boolean isPartitionedTopic = false; +try { +isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache() +.get(partitionedTopicPath).isPresent(); +} catch (Exception e) { +log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage()); +} +if (isPartitionedTopic) { +throw new NamingException( +topicName + " is a partitioned-topic and replication can't
[GitHub] sijie commented on issue #1186: Pulsar FAQ
sijie commented on issue #1186: Pulsar FAQ URL: https://github.com/apache/incubator-pulsar/pull/1186#issuecomment-367896380 ping @merlimat @rdhabalia ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yush1ga opened a new pull request #1272: Add prefix setting for Java reader
yush1ga opened a new pull request #1272: Add prefix setting for Java reader URL: https://github.com/apache/incubator-pulsar/pull/1272 ### Motivation Now reader cannot be used when subscription auth mode is prefix. ### Modifications Add prefix setting for Java reader. ### Result Java reader can be used when subscription auth mode is prefix. If the specification is acceptable, we are going to create another PR for C++ client. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services