[incubator-pulsar] branch asf-site updated: Updated site at revision d073a7c

2018-02-22 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 2b78809  Updated site at revision d073a7c
2b78809 is described below

commit 2b78809cb8e814679af67c38c50e7b5908033f8f
Author: jenkins 
AuthorDate: Thu Feb 22 08:12:00 2018 +

Updated site at revision d073a7c
---
 content/api/client/index-all.html  |  18 +++
 .../org/apache/pulsar/client/api/PulsarClient.html | 126 -
 2 files changed, 139 insertions(+), 5 deletions(-)

diff --git a/content/api/client/index-all.html 
b/content/api/client/index-all.html
index e1f035a..1760ef3 100644
--- a/content/api/client/index-all.html
+++ b/content/api/client/index-all.html
@@ -1597,6 +1597,14 @@
 Use PulsarClient.newConsumer()
 to build a new consumer
 
 
+subscribe(Collection,
 String) - Method in interface org.apache.pulsar.client.api.PulsarClient
+
+Subscribe to the given topic and subscription combination 
with default ConsumerConfiguration
+
+subscribe(Collection,
 String, ConsumerConfiguration) - Method in interface 
org.apache.pulsar.client.api.PulsarClient
+
+Subscribe to the given topics and subscription combination 
using given ConsumerConfiguration
+
 subscribeAsync()
 - Method in interface org.apache.pulsar.client.api.ConsumerBuilder
 
 Finalize the Consumer creation by subscribing 
to the topic in asynchronous mode.
@@ -1613,6 +1621,16 @@
 Use PulsarClient.newConsumer()
 to build a new consumer
 
 
+subscribeAsync(Collection,
 String) - Method in interface org.apache.pulsar.client.api.PulsarClient
+
+Asynchronously subscribe to the given topics and 
subscription combination with
+ default ConsumerConfiguration
+
+subscribeAsync(Collection,
 String, ConsumerConfiguration) - Method in interface 
org.apache.pulsar.client.api.PulsarClient
+
+Asynchronously subscribe to the given topics and 
subscription combination using given
+ ConsumerConfiguration
+
 subscriptionName(String)
 - Method in interface org.apache.pulsar.client.api.ConsumerBuilder
 
 Specify the subscription name for this consumer.
diff --git a/content/api/client/org/apache/pulsar/client/api/PulsarClient.html 
b/content/api/client/org/apache/pulsar/client/api/PulsarClient.html
index 0d6a841..86c6a10 100644
--- a/content/api/client/org/apache/pulsar/client/api/PulsarClient.html
+++ b/content/api/client/org/apache/pulsar/client/api/PulsarClient.html
@@ -17,7 +17,7 @@
 catch(err) {
 }
 //-->
-var methods = 
{"i0":17,"i1":6,"i2":6,"i3":49,"i4":49,"i5":38,"i6":38,"i7":38,"i8":38,"i9":38,"i10":38,"i11":6,"i12":6,"i13":6,"i14":6,"i15":38,"i16":38,"i17":38,"i18":38};
+var methods = 
{"i0":17,"i1":6,"i2":6,"i3":49,"i4":49,"i5":38,"i6":38,"i7":38,"i8":38,"i9":38,"i10":38,"i11":6,"i12":6,"i13":6,"i14":6,"i15":6,"i16":6,"i17":38,"i18":38,"i19":6,"i20":6,"i21":38,"i22":38};
 var tabs = {65535:["t0","All Methods"],1:["t1","Static 
Methods"],2:["t2","Instance Methods"],4:["t3","Abstract 
Methods"],16:["t5","Default Methods"],32:["t6","Deprecated Methods"]};
 var altColor = "altColor";
 var rowColor = "rowColor";
@@ -242,6 +242,21 @@ extends Closeable
 
 
 Consumer
+subscribe(Collection topics,
+ String subscription)
+Subscribe to the given topic and subscription combination 
with default ConsumerConfiguration
+
+
+
+Consumer
+subscribe(Collection topics,
+ String subscription,
+ ConsumerConfiguration conf)
+Subscribe to the given topics and subscription combination 
using given ConsumerConfiguration
+
+
+
+Consumer
 subscribe(String topic,
  String subscription)
 Deprecated. 
@@ -249,7 +264,7 @@ extends Closeable
 
 
 
-
+
 Consumer
 subscribe(String topic,
  String subscription,
@@ -259,7 +274,24 @@ extends Closeable
 
 
 
-
+
+CompletableFuture
+subscribeAsync(Collection topics,
+  String subscription)
+Asynchronously subscribe to the given topics and 
subscription combination with
+ default ConsumerConfiguration
+
+
+
+CompletableFuture
+subscribeAsync(Collection topics,
+  String subscription,
+  ConsumerConfiguration conf)
+Asynchronously subscribe to the given topics and 
subscription combination using given
+ ConsumerConfiguration
+
+
+
 CompletableFuture
 subscribeAsync(String topic,
   String subscription)
@@ -268,7 +300,7 @@ extends Closeable
 
 
 
-
+
 CompletableFuture
 subscribeAsync(String topic,
   String subscription,
@@ -706,7 +738,7 @@ CompletableFuture<
 
 
-
+
 
 shutdown
 void shutdown()
@@ -720,6 +752,90 @@ CompletableFuture<
+
+
+
+
+subscribe
+Consumer subscribe(Collection topics,
+   String subscription)
+throws PulsarClientException
+Subscribe to the given topic and subscription combination 

[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898353
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 ##
 @@ -148,20 +148,23 @@ public void testSyncProducerAndConsumer() throws 
Exception {
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
-
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
 // 1. producer connect
-Producer producer1 = pulsarClient.createProducer(topicName1);
-Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
-Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+.create();
+Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
+Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
 
 Review comment:
   `receiverQueueSize(4)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898450
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 ##
 @@ -205,20 +208,23 @@ public void testAsyncConsumer() throws Exception {
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
-
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
 // 1. producer connect
-Producer producer1 = pulsarClient.createProducer(topicName1);
-Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
-Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+.create();
+Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
+Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
 
 Review comment:
   `receiverQueueSize(4)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169899089
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 ##
 @@ -77,22 +81,51 @@ public Consumer subscribe() throws PulsarClientException {
 
 @Override
 public CompletableFuture subscribeAsync() {
-if (topicName == null) {
+if (topicNames == null || topicNames.isEmpty()) {
 return FutureUtil
-.failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
+.failedFuture(new IllegalArgumentException("Topic name 
must be set on the consumer builder"));
 }
 
 if (subscriptionName == null) {
 return FutureUtil.failedFuture(
-new IllegalArgumentException("Subscription name must be 
set on the producer builder"));
+new IllegalArgumentException("Subscription name must be 
set on the consumer builder"));
 }
 
-return client.subscribeAsync(topicName, subscriptionName, conf);
+if (topicNames.size() == 1) {
+return client.subscribeAsync(topicNames.get(0), subscriptionName, 
conf);
+} else {
+return client.subscribeAsync(topicNames, subscriptionName, conf);
+}
 }
 
 @Override
-public ConsumerBuilder topic(String topicName) {
-this.topicName = topicName;
+public ConsumerBuilder topic(String... topicNames) {
+checkArgument(topicNames.length > 0, "Passed in topicNames should not 
be empty.");
+if (this.topicNames == null) {
+this.topicNames = Lists.newArrayList();
+}
+
+for (int i = 0; i < topicNames.length; i++) {
+if (!this.topicNames.contains(topicNames[i])) {
+this.topicNames.add(topicNames[i]);
+}
+}
+return this;
+}
+
+@Override
+public ConsumerBuilder topics(List topicNames) {
+checkArgument(topicNames != null && !topicNames.isEmpty(),
+"Passed in topicNames list should not be empty.");
+if (this.topicNames == null) {
+this.topicNames = topicNames;
+} else {
+topicNames.forEach(name -> {
 
 Review comment:
   this.topicNames.addAll(topicNames)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898223
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 ##
 @@ -103,18 +101,20 @@ public void testGetConsumersAndGetTopics() throws 
Exception {
 final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
 final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
 final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
-List topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+List topicNames = Lists.newArrayList(topicName1, topicName2);
 
 admin.properties().createProperty("prop", new PropertyAdmin());
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.topic(topicName3)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
 
 Review comment:
   you need configure `receiverQueueSize(4)` no? there was 
`setReceiverQueueSize(4)` at ConsumerConfiguration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898053
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 ##
 @@ -82,12 +79,13 @@ public void testDifferentTopicsNameSubscribe() throws 
Exception {
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
 try {
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
 
 Review comment:
   you need configure `receiverQueueSize(4)` no? there was 
`setReceiverQueueSize(4)` at ConsumerConfiguration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898906
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 ##
 @@ -40,9 +44,9 @@
 private static final long serialVersionUID = 1L;
 
 private final PulsarClientImpl client;
-private String topicName;
 private String subscriptionName;
 private final ConsumerConfiguration conf;
+private List topicNames;
 
 Review comment:
   I would suggest using Set instead of List, so this allows us removing 
duplicated topic names.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
sijie commented on a change in pull request #1269: Issue #1237: support builder 
for topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#discussion_r169898511
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 ##
 @@ -280,20 +286,23 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
-
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
 // 1. producer connect
-Producer producer1 = pulsarClient.createProducer(topicName1);
-Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
-Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+.create();
+Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
+Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+.create();
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
 
 Review comment:
   `receiverQueueSize(4)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
zhaijack commented on issue #1269: Issue #1237: support builder for 
topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269#issuecomment-367628188
 
 
   @sijie Thanks, have changed it. receiverQueueSize was added to test some 
functionality. added it back here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1259: Is there any relationship between Yahoo Message Broker and Pulsar projects?

2018-02-22 Thread GitBox
merlimat commented on issue #1259: Is there any relationship between Yahoo 
Message Broker and Pulsar projects?
URL: 
https://github.com/apache/incubator-pulsar/issues/1259#issuecomment-367759969
 
 
   > Am I right to choose Pulsar as the right option to realize the 
application? 
   
   That is exactly the same scenario as Sherpa use case, so I would definitely 
say that it is a good match. We literally designed for that use case :)
   
   > would you give me some advice? thanks a lot!
   
   Sure, shoot up any question/doubts either here, in the mailing lists or in 
the Slack channel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #1268: Add get-peer clusters admin api

2018-02-22 Thread GitBox
rdhabalia closed pull request #1268: Add get-peer clusters admin api
URL: https://github.com/apache/incubator-pulsar/pull/1268
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 733105a37..de670c136 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -213,6 +213,28 @@ public void setPeerClusterNames(@PathParam("cluster") 
String cluster, LinkedHash
 }
 }
 
+   @GET
+   @Path("/{cluster}/peers")
+   @ApiOperation(value = "Get the peer-cluster data for the specified 
cluster.", response = Set.class)
+   @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+   @ApiResponse(code = 404, message = "Cluster doesn't 
exist") })
+   public Set getPeerCluster(@PathParam("cluster") String cluster) 
{
+   validateSuperUserAccess();
+
+   try {
+   String clusterPath = path("clusters", cluster);
+   byte[] content = globalZk().getData(clusterPath, null, 
null);
+   ClusterData clusterData = 
jsonMapper().readValue(content, ClusterData.class);
+   return clusterData.getPeerClusterNames();
+   } catch (KeeperException.NoNodeException e) {
+   log.warn("[{}] Failed to get cluster {}: Does not 
exist", clientAppId(), cluster);
+   throw new RestException(Status.NOT_FOUND, "Cluster does 
not exist");
+   } catch (Exception e) {
+   log.error("[{}] Failed to get cluster {}", 
clientAppId(), cluster, e);
+   throw new RestException(e);
+   }
+   }
+
 @DELETE
 @Path("/{cluster}")
 @ApiOperation(value = "Delete an existing cluster")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index c344af55a..d5456fcca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
+import java.util.LinkedHashSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -151,6 +154,22 @@ public void testPeerClusterTopicLookup(String protocol) 
throws Exception {
 
 }
 
+   @Test
+   public void testGetPeerClusters() throws Exception {
+   final String mainClusterName = "r1";
+   
assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null);
+   LinkedHashSet peerClusters = 
Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3"));
+   admin1.clusters().updatePeerClusterNames(mainClusterName, 
peerClusters);
+   retryStrategically((test) -> {
+   try {
+   return 
admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1;
+   } catch (PulsarAdminException e) {
+   return false;
+   }
+   }, 5, 100);
+   
assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), 
peerClusters);
+   }
+
 private static final Logger log = 
LoggerFactory.getLogger(PeerReplicatorTest.class);
 
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 779e7b117..4048367de 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -21,6 +21,7 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.pulsar.client.a

[incubator-pulsar] branch master updated: Add get-peer clusters admin api (#1268)

2018-02-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7b07fce  Add get-peer clusters admin api (#1268)
7b07fce is described below

commit 7b07fce3ccf633f705b5bbe1de3248c5abdcac10
Author: Rajan Dhabalia 
AuthorDate: Thu Feb 22 11:06:19 2018 -0800

Add get-peer clusters admin api (#1268)
---
 .../pulsar/broker/admin/impl/ClustersBase.java | 22 +
 .../pulsar/broker/service/PeerReplicatorTest.java  | 19 ++
 .../org/apache/pulsar/client/admin/Clusters.java   | 23 ++
 .../pulsar/client/admin/internal/ClustersImpl.java | 10 ++
 .../org/apache/pulsar/admin/cli/CmdClusters.java   | 14 +
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  3 +++
 6 files changed, 91 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 733105a..de670c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -213,6 +213,28 @@ public class ClustersBase extends AdminResource {
 }
 }
 
+   @GET
+   @Path("/{cluster}/peers")
+   @ApiOperation(value = "Get the peer-cluster data for the specified 
cluster.", response = Set.class)
+   @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+   @ApiResponse(code = 404, message = "Cluster doesn't 
exist") })
+   public Set getPeerCluster(@PathParam("cluster") String cluster) 
{
+   validateSuperUserAccess();
+
+   try {
+   String clusterPath = path("clusters", cluster);
+   byte[] content = globalZk().getData(clusterPath, null, 
null);
+   ClusterData clusterData = 
jsonMapper().readValue(content, ClusterData.class);
+   return clusterData.getPeerClusterNames();
+   } catch (KeeperException.NoNodeException e) {
+   log.warn("[{}] Failed to get cluster {}: Does not 
exist", clientAppId(), cluster);
+   throw new RestException(Status.NOT_FOUND, "Cluster does 
not exist");
+   } catch (Exception e) {
+   log.error("[{}] Failed to get cluster {}", 
clientAppId(), cluster, e);
+   throw new RestException(e);
+   }
+   }
+
 @DELETE
 @Path("/{cluster}")
 @ApiOperation(value = "Delete an existing cluster")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index c344af5..d5456fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
+import java.util.LinkedHashSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -151,6 +154,22 @@ public class PeerReplicatorTest extends ReplicatorTestBase 
{
 
 }
 
+   @Test
+   public void testGetPeerClusters() throws Exception {
+   final String mainClusterName = "r1";
+   
assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null);
+   LinkedHashSet peerClusters = 
Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3"));
+   admin1.clusters().updatePeerClusterNames(mainClusterName, 
peerClusters);
+   retryStrategically((test) -> {
+   try {
+   return 
admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1;
+   } catch (PulsarAdminException e) {
+   return false;
+   }
+   }, 5, 100);
+   
assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), 
peerClusters);
+   }
+
 private static final Logger log = 
LoggerFactory.getLogger(PeerReplicatorTest.class);
 
 }
diff --git 
a/pulsar-

[GitHub] merlimat opened a new pull request #1270: Added ManagedLedger perf tool

2018-02-22 Thread GitBox
merlimat opened a new pull request #1270: Added ManagedLedger perf tool
URL: https://github.com/apache/incubator-pulsar/pull/1270
 
 
   ### Motivation
   
   Added tool to do performance tests directly on BookKeeper, using the 
ManagedLedger API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1269: Issue #1237: support builder for topicsConsumer

2018-02-22 Thread GitBox
merlimat closed pull request #1269: Issue #1237: support builder for 
topicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index a2cb6d579..2aa386e71 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -33,12 +33,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -82,12 +79,13 @@ public void testDifferentTopicsNameSubscribe() throws 
Exception {
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
 try {
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+.subscribe();
 fail("subscribe for topics from different namespace should fail.");
 } catch (IllegalArgumentException e) {
 // expected for have different namespace
@@ -103,18 +101,21 @@ public void testGetConsumersAndGetTopics() throws 
Exception {
 final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
 final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
 final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
-List topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+List topicNames = Lists.newArrayList(topicName1, topicName2);
 
 admin.properties().createProperty("prop", new PropertyAdmin());
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.topic(topicName3)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+.receiverQueueSize(4)
+.subscribe();
 assertTrue(consumer instanceof TopicsConsumerImpl);
 
 List topics = ((TopicsConsumerImpl) 
consumer).getPartitionedTopics();
@@ -148,20 +149,24 @@ public void testSyncProducerAndConsumer() throws 
Exception {
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
-
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
 // 1. producer connect
-Producer producer1 = pulsarClient.createProducer(topicName1);
-Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
-Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+.create();
+Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+

[incubator-pulsar] branch master updated: Issue #1237: support builder for topicsConsumer (#1269)

2018-02-22 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 68cd115  Issue #1237: support builder for topicsConsumer (#1269)
68cd115 is described below

commit 68cd1154e9581bb85032da5a8e1d44336b25
Author: Jia Zhai 
AuthorDate: Thu Feb 22 11:38:59 2018 -0800

Issue #1237: support builder for topicsConsumer (#1269)

* support builder for topicsConsumer

* change following @Matteo's comments to remove subscribe methods in client

* change to topic(String ... names) for consumer builder

* change following @sijie's comments
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 197 +++--
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  16 +-
 .../org/apache/pulsar/client/api/PulsarClient.java |  57 --
 .../pulsar/client/impl/ConsumerBuilderImpl.java|  41 -
 .../pulsar/client/impl/PulsarClientImpl.java   |  32 
 5 files changed, 186 insertions(+), 157 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index a2cb6d5..2aa386e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -33,12 +33,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -82,12 +79,13 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
 try {
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+.subscribe();
 fail("subscribe for topics from different namespace should fail.");
 } catch (IllegalArgumentException e) {
 // expected for have different namespace
@@ -103,18 +101,21 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
 final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
 final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
-List topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+List topicNames = Lists.newArrayList(topicName1, topicName2);
 
 admin.properties().createProperty("prop", new PropertyAdmin());
 admin.persistentTopics().createPartitionedTopic(topicName2, 2);
 admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
 // 2. Create consumer
-ConsumerConfiguration conf = new ConsumerConfiguration();
-conf.setReceiverQueueSize(4);
-conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-conf.setSubscriptionType(SubscriptionType.Shared);
-Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+Consumer consumer = pulsarClient.newConsumer()
+.topics(topicNames)
+.topic(topicName3)
+.subscriptionName(subscriptionName)
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+.receiverQueueSize(4)
+.subscribe();
 assertTrue(consumer instanceof TopicsConsumerImpl);
 
 List topics = ((TopicsConsumerImpl) 
consumer).getPartitionedTopics();
@@ -148,20 +149,24 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 admin.persistentTo

[GitHub] lucperkins opened a new pull request #1271: Message deduplication documentation (WIP)

2018-02-22 Thread GitBox
lucperkins opened a new pull request #1271: Message deduplication documentation 
(WIP)
URL: https://github.com/apache/incubator-pulsar/pull/1271
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1165: PIP-13-2/3: support regex based subscription

2018-02-22 Thread GitBox
zhaijack commented on issue #1165: PIP-13-2/3: support regex based subscription
URL: https://github.com/apache/incubator-pulsar/pull/1165#issuecomment-367842452
 
 
   would like to close this one and open a new PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Broker should not start replicator for root partitioned-topic (#1262)

2018-02-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f38a003  Broker should not start replicator for root partitioned-topic 
(#1262)
f38a003 is described below

commit f38a003a2a8331da94996fc7ea871abfc779ae24
Author: Rajan Dhabalia 
AuthorDate: Thu Feb 22 18:30:58 2018 -0800

Broker should not start replicator for root partitioned-topic (#1262)

* Broker should not start replicator for root partitioned-topic

* address comment
---
 .../pulsar/broker/service/AbstractReplicator.java  | 51 +--
 .../pulsar/broker/service/BrokerService.java   | 53 +---
 .../nonpersistent/NonPersistentReplicator.java |  3 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 35 +--
 .../service/persistent/PersistentReplicator.java   |  3 +-
 .../broker/service/persistent/PersistentTopic.java | 39 ++--
 .../pulsar/broker/service/ReplicatorTest.java  | 72 ++
 .../pulsar/broker/service/ReplicatorTestBase.java  |  3 +
 8 files changed, 220 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 49213c9..4642a85 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.broker.service.AbstractReplicator.State;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +61,9 @@ public abstract class AbstractReplicator {
 Stopped, Starting, Started, Stopping
 }
 
-public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-String remoteCluster, BrokerService brokerService) {
+public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+BrokerService brokerService) throws NamingException {
+validatePartitionedTopic(topicName, brokerService);
 this.brokerService = brokerService;
 this.topicName = topicName;
 this.replicatorPrefix = replicatorPrefix;
@@ -69,8 +74,7 @@ public abstract class AbstractReplicator {
 this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
 this.producerBuilder = client.newProducer() //
-.topic(topicName)
-.sendTimeout(0, TimeUnit.SECONDS) //
+.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
 .maxPendingMessages(producerQueueSize) //
 .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
 STATE_UPDATER.set(this, State.Stopped);
@@ -211,5 +215,42 @@ public abstract class AbstractReplicator {
 return (replicatorPrefix + "." + cluster).intern();
 }
 
+/**
+ * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+ * 
+ * 
+ * eg:
+ * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+ * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+ * 
+ * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+ * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+ * replicator producers.
+ * 
+ * 
+ * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+ * producers.
+ * 
+ * @param topicName
+ * @param brokerService
+ */
+private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+DestinationName d

[GitHub] rdhabalia closed pull request #1262: Broker should not start replicator for root partitioned-topic

2018-02-22 Thread GitBox
rdhabalia closed pull request #1262: Broker should not start replicator for 
root partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 49213c90d..4642a8503 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.broker.service.AbstractReplicator.State;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +61,9 @@
 Stopped, Starting, Started, Stopping
 }
 
-public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-String remoteCluster, BrokerService brokerService) {
+public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+BrokerService brokerService) throws NamingException {
+validatePartitionedTopic(topicName, brokerService);
 this.brokerService = brokerService;
 this.topicName = topicName;
 this.replicatorPrefix = replicatorPrefix;
@@ -69,8 +74,7 @@ public AbstractReplicator(String topicName, String 
replicatorPrefix, String loca
 this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
 this.producerBuilder = client.newProducer() //
-.topic(topicName)
-.sendTimeout(0, TimeUnit.SECONDS) //
+.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
 .maxPendingMessages(producerQueueSize) //
 .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
 STATE_UPDATER.set(this, State.Stopped);
@@ -211,5 +215,42 @@ public static String getReplicatorName(String 
replicatorPrefix, String cluster)
 return (replicatorPrefix + "." + cluster).intern();
 }
 
+/**
+ * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+ * 
+ * 
+ * eg:
+ * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+ * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+ * 
+ * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+ * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+ * replicator producers.
+ * 
+ * 
+ * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+ * producers.
+ * 
+ * @param topicName
+ * @param brokerService
+ */
+private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+DestinationName destination = DestinationName.get(topicName);
+String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+destination.getNamespace().toString(), 
destination.getDomain().toString(),
+destination.getEncodedLocalName());
+boolean isPartitionedTopic = false;
+try {
+isPartitionedTopic = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+.get(partitionedTopicPath).isPresent();
+} catch (Exception e) {
+log.warn("Failed to verify partitioned topic {}-{}", topicName, 
e.getMessage());
+}
+if (isPartitionedTopic) {
+throw new NamingException(
+topicName + " is a partitioned-topic and replication can't

[GitHub] sijie commented on issue #1186: Pulsar FAQ

2018-02-22 Thread GitBox
sijie commented on issue #1186: Pulsar FAQ
URL: https://github.com/apache/incubator-pulsar/pull/1186#issuecomment-367896380
 
 
   ping @merlimat @rdhabalia ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yush1ga opened a new pull request #1272: Add prefix setting for Java reader

2018-02-22 Thread GitBox
yush1ga opened a new pull request #1272: Add prefix setting for Java reader
URL: https://github.com/apache/incubator-pulsar/pull/1272
 
 
   ### Motivation
   
   Now reader cannot be used when subscription auth mode is prefix.
   
   ### Modifications
   
   Add prefix setting for Java reader.
   
   ### Result
   
   Java reader can be used when subscription auth mode is prefix.
   If the specification is acceptable, we are going to create another PR for 
C++ client.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services