[GitHub] maskit commented on issue #1225: Enable specification of TLS Protocol Versions and Cipher Suites

2018-02-13 Thread GitBox
maskit commented on issue #1225: Enable specification of TLS Protocol Versions 
and Cipher Suites
URL: https://github.com/apache/incubator-pulsar/pull/1225#issuecomment-365254767
 
 
   @jai1 You may want to read these. If I understand correctly, HTTP 
entrypoints, which use `SecurityUtility::createSslContext`, may accept any 
protocol versions, even if we change the string to "TLS1.2".
   
   
https://docs.oracle.com/javase/9/docs/api/javax/net/ssl/SSLContext.html#getInstance-java.lang.String-
   
https://docs.oracle.com/javase/9/docs/specs/security/standard-names.html#sslcontext-algorithms
   
https://stackoverflow.com/questions/43481010/how-to-enable-only-tlsv1-2-on-java-8-server-application?noredirect=1=1
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 ##
 @@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
 
 Review comment:
   Thanks. Here it is also mainly a wrapper for original MessageImpl, the 
original MessageImpl will be used for one internal-sub-consumer. a extend may 
be not easy to handle it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 ##
 @@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
 
 Review comment:
   Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We 
need keep a reference of MessageIdImpl, because it will be used for one 
internal-sub-consumer. 
   If extending,  the constructor and getInnerMessageIdInner may be not easy to 
handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 ##
 @@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
 
 Review comment:
   Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We 
need keep a reference of MessageIdImpl, because it will be used for each 
internal-sub-consumer. 
   If extending,  the constructor and getInnerMessageIdInner may be not easy to 
handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167811273
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
 
 Review comment:
   Thanks. will add it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #1231: Read from compacted topic ledger if available and enabled

2018-02-13 Thread GitBox
ivankelly opened a new pull request #1231: Read from compacted topic ledger if 
available and enabled
URL: https://github.com/apache/incubator-pulsar/pull/1231
 
 
   If a topic has been compacted and the client has enabled reads from
   compacted topics, try to read from the compacted ledger if the cursor
   position lands before or within the range of message IDs in the
   compacted topic ledger. If the cursor position lands after the message
   IDs, in the compacted topic ledger, read from the cursor as normal.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled

2018-02-13 Thread GitBox
ivankelly commented on issue #1231: Read from compacted topic ledger if 
available and enabled
URL: https://github.com/apache/incubator-pulsar/pull/1231#issuecomment-365215391
 
 
   @merlimat 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client

2018-02-13 Thread GitBox
ivankelly opened a new pull request #1230: ProxyForwardAuthDataTest shouldn't 
reuse pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/1230
 
 
   This test was flaking due to the pulsar client being reused. The test
   starts a proxy service, subscribes to a topic, stops the proxy, starts
   a new proxy and tries to subscribe again.
   
   If using the same client for both connections, the client will have an
   open connection to the first proxy, which, when the proxy is stopped,
   will be closed by netty asynchronously. This can race with the second
   subscription attempt, and cause it to fail with
   ConnectionClosedException. Specifically, if the subscription attempt
   runs before the netty callback runs, the subscription attempt will try
   to subscribe on a dead connection.
   
   The immediate fix for this test, to stop the flaking, is just to use a
   different client for each attempt.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client

2018-02-13 Thread GitBox
ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse 
pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365204264
 
 
   This patch addresses #1211 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client

2018-02-13 Thread GitBox
ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse 
pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365205415
 
 
   @jai1 @merlimat @mgodave 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


svn commit: r25025 - /dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/

2018-02-13 Thread jai1
Author: jai1
Date: Tue Feb 13 09:59:07 2018
New Revision: 25025

Log:
Staging artifacts and signature for Pulsar release 1.22.0-incubating

Added:
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/

dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz
   (with props)

dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc

dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5

dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512

Added: 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz
==
Binary file - no diff available.

Propchange: 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz
--
svn:mime-type = application/octet-stream

Added: 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc
==
--- 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc
 (added)
+++ 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc
 Tue Feb 13 09:59:07 2018
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEDQCP4t9TLRC/fG0se6GmTLvBFOwFAlqCtq8ACgkQe6GmTLvB
+FOyEwhAAt0OFdejzQTJi7KA2Qv+mhazVuBEZfygwyRjNy7pNoX+CI0tIDCo6myDd
+jfUjx3Fg6hFR3WGzLW0+n+meHxsWrcpiqoR6vV7PjH+CpxJXY6+ZrS3/0QuDdAT/
+/ZlSStC8S3AT6mSRJzjab9HGWIwsE+QZmuiLzjzkUTBDNZ4QAstbN57MY4DrDDe3
+HbSKoCDrPWp+eWmuimD2U1fhxMpPKaUonXLvH1jAhsLYNUt+yS7o0UOlFm+gbktK
+h2t4T+t2QeHzefkh8+Tud1EV0wObTbel54qda3LKGwgGUwPCMs7UxZ/oeSJTd25E
+oKAizymEcdcnpPHBYfq7pr/QhPW2bpO1OVjVKBk8mN3SjVO1366MawxrAfr1AKby
+wQcy8kVN2YskM0VSYl+Ck5BRJl//NO59oGFIPtp7IXy0F/yqokwkVJC4I7YPM3ua
+r5Ozixl8+NFDPE+8SVrmUASiHJLbjG7RBT3SyWuzEMV642uQY6FHp6NMlncBczkK
+slkXz/U5i+E/GM+5Q+MZ7HmwzrCmicYgqiUI8pryYLLPxRjtpiZ9f074jiRtXxFO
+5HRZFackUg0Zz2e0pBhv8FCfLvMc4NiCt8LvqmcPNnXEjMc6m4/Vz+6UheLRBRK+
+wnVxieyKxZYDxFe47xorGMwuGa7M9S4BIWxBuVQ6/lVi2TIQXQw=
+=KFvU
+-END PGP SIGNATURE-

Added: 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5
==
--- 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5
 (added)
+++ 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5
 Tue Feb 13 09:59:07 2018
@@ -0,0 +1,2 @@
+apache-pulsar-1.22.0-incubating-src.tar.gz: 
+29 F0 89 45 04 08 AD 06  6F 75 0C B7 91 6F F9 9F

Added: 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512
==
--- 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512
 (added)
+++ 
dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512
 Tue Feb 13 09:59:07 2018
@@ -0,0 +1,3 @@
+apache-pulsar-1.22.0-incubating-src.tar.gz: 
+52ED243F 8DE5BAFA BC430118 9629D216 577AC2AF 4B186317 A8BC513C 4D87FE38 
82C7088F
+ ED1E9B1E D7CAD96D A5287730 15A55080 32BA182A D9A60619 C6813D48




[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167813669
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167879404
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 ##
 @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() {
 }
 
 @Override
-public void redeliverUnacknowledgedMessages(Set messageIds) 
{
+public void redeliverUnacknowledgedMessages(Set messageIds) {
 
 Review comment:
   Oh, it is in ConsumerBase.java, sorry for the wrong reference of 
Consumer.java. 
   TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as 
the reply below, may be better to implements MessageId, instead of extends 
MessageIdImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
sijie commented on issue #1066: Issue 937: add CommandGetLastMessageId to make 
reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-365286252
 
 
   @zhaijack : can you address @merlimat and @ivankelly 's comments? we need 
this change for function worker. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167907583
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli)

2018-02-13 Thread GitBox
zhaijack commented on issue #1059: Issue 1014: Rename "global zookeeper" to 
"configuration-store"(change in code, conf and cli)
URL: https://github.com/apache/incubator-pulsar/pull/1059#issuecomment-365298276
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167911587
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 ##
 @@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
 
 Review comment:
   Thanks. Here it is also mainly a wrapper for original MessageImpl, the 
original MessageImpl will be used for one internal-sub-consumer. a extend may 
be not easy to handle constructor and get.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to 
make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-365287664
 
 
   Thanks, have resolved all the comments, and fixed rebase conflict. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167906488
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   Thanks, will make a class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915637
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915413
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915437
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
 ##
 @@ -37,6 +36,16 @@
  */
 byte[] toByteArray();
 
+/**
+ * Get the topic name of this MessageId.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` 
for `redeliverUnacknowledgedMessages`.
   will remove it from MessageId


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
 
 Review comment:
   Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, 
that is also the reason that this change keep use a lot of same interface. 
   But it maybe good to make them separate at first, this could avoid bring 
bugs into PartitionedConsumerImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914612
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
 ##
 @@ -124,4 +124,14 @@
  * @return the key of the message
  */
 String getKey();
+
+/**
+ * Get the topic name of this message.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
 
 Review comment:
   Thanks. will change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914515
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
 ##
 @@ -124,4 +124,14 @@
  * @return the key of the message
  */
 String getKey();
+
+/**
+ * Get the topic name of this message.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   thanks. will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client

2018-02-13 Thread GitBox
mgodave commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse 
pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365314914
 
 
   Looks good to me, someone with permissions will have to review it though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
sijie commented on issue #1156: Introduce ActiveConsumerListener for realizing 
if a consumer is active in a failover subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-365325159
 
 
   @merlimat I have addressed your comments, please take another round of 
review. If this looks good for you, I will rebase it to latest master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230)

2018-02-13 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 56ebe36  ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230)
56ebe36 is described below

commit 56ebe368ec2eb8569218ca4b438ee53f2bf5145e
Author: Ivan Kelly 
AuthorDate: Tue Feb 13 18:54:29 2018 +0100

ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230)

* ProxyForwardAuthDataTest shouldn't reuse pulsar client

This test was flaking due to the pulsar client being reused. The test
starts a proxy service, subscribes to a topic, stops the proxy, starts
a new proxy and tries to subscribe again.

If using the same client for both connections, the client will have an
open connection to the first proxy, which, when the proxy is stopped,
will be closed by netty asynchronously. This can race with the second
subscription attempt, and cause it to fail with
ConnectionClosedException. Specifically, if the subscription attempt
runs before the netty callback runs, the subscription attempt will try
to subscribe on a dead connection.

The immediate fix for this test, to stop the flaking, is just to use a
different client for each attempt.

* Remove trailing whitespace
---
 .../proxy/server/ProxyForwardAuthDataTest.java | 55 +-
 1 file changed, 23 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 2309ebb..d4b8e4d 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -25,7 +25,6 @@ import java.util.Set;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -48,7 +47,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 private static final Logger log = 
LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
 private int webServicePort;
 private int servicePort;
-
+
 @BeforeMethod
 @Override
 protected void setup() throws Exception {
@@ -60,11 +59,11 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
 conf.setBrokerClientAuthenticationParameters("authParam:broker");
 conf.setAuthenticateOriginalAuthData(true);
-
+
 Set superUserRoles = new HashSet();
 superUserRoles.add("admin");
 conf.setSuperUserRoles(superUserRoles);
-
+
 Set providers = new HashSet();
 providers.add(BasicAuthenticationProvider.class.getName());
 conf.setAuthenticationProviders(providers);
@@ -79,9 +78,9 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 
 @Override
 protected void cleanup() throws Exception {
-super.internalCleanup();   
+super.internalCleanup();
 }
-
+
 @Test
 void testForwardAuthData() throws Exception {
 log.info("-- Starting {} test --", methodName);
@@ -95,15 +94,13 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 String subscriptionName = "my-subscriber-name";
 String clientAuthParams = "authParam:client";
 String proxyAuthParams = "authParam:proxy";
-
+
 admin.properties().createProperty("my-property",
 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
 admin.namespaces().createNamespace(namespaceName);
-
 admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 
-
 // Step 2: Run Pulsar Proxy without forwarding authData - expect 
Exception
 ProxyConfiguration proxyConfig = new ProxyConfiguration();
 proxyConfig.setAuthenticationEnabled(true);
@@ -111,35 +108,29 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 proxyConfig.setServicePort(servicePort);
 proxyConfig.setWebServicePort(webServicePort);
 proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
-
 

[GitHub] merlimat closed pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client

2018-02-13 Thread GitBox
merlimat closed pull request #1230: ProxyForwardAuthDataTest shouldn't reuse 
pulsar client
URL: https://github.com/apache/incubator-pulsar/pull/1230
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 2309ebbf3..d4b8e4d86 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -25,7 +25,6 @@
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -48,7 +47,7 @@
 private static final Logger log = 
LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
 private int webServicePort;
 private int servicePort;
-
+
 @BeforeMethod
 @Override
 protected void setup() throws Exception {
@@ -60,11 +59,11 @@ protected void setup() throws Exception {
 
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
 conf.setBrokerClientAuthenticationParameters("authParam:broker");
 conf.setAuthenticateOriginalAuthData(true);
-
+
 Set superUserRoles = new HashSet();
 superUserRoles.add("admin");
 conf.setSuperUserRoles(superUserRoles);
-
+
 Set providers = new HashSet();
 providers.add(BasicAuthenticationProvider.class.getName());
 conf.setAuthenticationProviders(providers);
@@ -79,9 +78,9 @@ protected void setup() throws Exception {
 
 @Override
 protected void cleanup() throws Exception {
-super.internalCleanup();   
+super.internalCleanup();
 }
-
+
 @Test
 void testForwardAuthData() throws Exception {
 log.info("-- Starting {} test --", methodName);
@@ -95,15 +94,13 @@ void testForwardAuthData() throws Exception {
 String subscriptionName = "my-subscriber-name";
 String clientAuthParams = "authParam:client";
 String proxyAuthParams = "authParam:proxy";
-
+
 admin.properties().createProperty("my-property",
 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
 admin.namespaces().createNamespace(namespaceName);
-
 admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 
-
 // Step 2: Run Pulsar Proxy without forwarding authData - expect 
Exception
 ProxyConfiguration proxyConfig = new ProxyConfiguration();
 proxyConfig.setAuthenticationEnabled(true);
@@ -111,35 +108,29 @@ void testForwardAuthData() throws Exception {
 proxyConfig.setServicePort(servicePort);
 proxyConfig.setWebServicePort(webServicePort);
 proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
-
 
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
 proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
 
 Set providers = new HashSet<>();
 providers.add(BasicAuthenticationProvider.class.getName());
 proxyConfig.setAuthenticationProviders(providers);
-ProxyService proxyService = new ProxyService(proxyConfig);
-
-proxyService.start();
-PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams);
-Consumer consumer;
-boolean exceptionOccured = false;
-try {
-consumer = proxyClient.subscribe(topicName, subscriptionName);
-} catch(Exception ex) {
-exceptionOccured  = true;
-} 
-Assert.assertTrue(exceptionOccured);
-proxyService.close();
-
+
+try (ProxyService proxyService = new ProxyService(proxyConfig);
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams)) {
+proxyService.start();
+proxyClient.subscribe(topicName, subscriptionName);
+Assert.fail("Shouldn't be able to subscribe, auth required");
+} catch (PulsarClientException.AuthorizationException e) {
+// expected behaviour
+}
+
 // Step 3: Create proxy with 

[GitHub] merlimat commented on a change in pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167959766
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -22,6 +22,20 @@ package pulsar.proto;
 option java_package = "org.apache.pulsar.common.api.proto";
 option optimize_for = LITE_RUNTIME;
 
+message Schema {
+   required string name = 1;
+   required bytes version = 2;
+   required bytes schema_data = 7;
+repeated KeyValue properties = 8;
+}
 
 Review comment:
   How are we identifying the schema type? (eg: Json vs Avro vs..)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167959515
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -272,6 +289,8 @@ message CommandProducer {
 
 /// Add optional metadata key=value to this producer
 repeated KeyValue metadata= 6;
+
+   optional int64 schema_version = 7;
 
 Review comment:
   I think that when creating producer & consumer we need to specify the actual 
schema so that we can validate on broker side, and if that fails, the producer 
creation fails.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave opened a new pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
mgodave opened a new pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232
 
 
   See #1137 for reference


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
mgodave commented on a change in pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167994769
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -22,6 +22,20 @@ package pulsar.proto;
 option java_package = "org.apache.pulsar.common.api.proto";
 option optimize_for = LITE_RUNTIME;
 
+message Schema {
+   required string name = 1;
+   required bytes version = 2;
+   required bytes schema_data = 7;
 
 Review comment:
   Yeah definitely, I deleted a few fields and didn't readjust the numbering.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled

2018-02-13 Thread GitBox
ivankelly commented on issue #1231: Read from compacted topic ledger if 
available and enabled
URL: https://github.com/apache/incubator-pulsar/pull/1231#issuecomment-365368533
 
 
   I need to fixup tests on this, I think some mocks are missing implementations


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Bumped master to 2.0.0-incubating-SNAPSHOT (#1226)

2018-02-13 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e77faf4  Bumped master to 2.0.0-incubating-SNAPSHOT (#1226)
e77faf4 is described below

commit e77faf4acf760447850bd2e4c99acae24d2ae27c
Author: Matteo Merli 
AuthorDate: Tue Feb 13 12:08:58 2018 -0800

Bumped master to 2.0.0-incubating-SNAPSHOT (#1226)
---
 all/pom.xml  | 2 +-
 buildtools/pom.xml   | 2 +-
 managed-ledger/pom.xml   | 2 +-
 pom.xml  | 2 +-
 pulsar-broker-auth-athenz/pom.xml| 2 +-
 pulsar-broker-common/pom.xml | 2 +-
 pulsar-broker-shaded/pom.xml | 2 +-
 pulsar-broker/pom.xml| 2 +-
 pulsar-checksum/pom.xml  | 2 +-
 pulsar-client-admin-shaded/pom.xml   | 2 +-
 pulsar-client-admin/pom.xml  | 2 +-
 pulsar-client-auth-athenz/pom.xml| 2 +-
 pulsar-client-kafka-compat/pom.xml   | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml | 2 +-
 pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml   | 2 +-
 pulsar-client-shaded/pom.xml | 2 +-
 pulsar-client-tools/pom.xml  | 2 +-
 pulsar-client/pom.xml| 2 +-
 pulsar-common/pom.xml| 2 +-
 pulsar-discovery-service/pom.xml | 2 +-
 pulsar-proxy/pom.xml | 2 +-
 pulsar-spark/pom.xml | 2 +-
 pulsar-storm/pom.xml | 2 +-
 pulsar-testclient/pom.xml| 2 +-
 pulsar-websocket/pom.xml | 2 +-
 pulsar-zookeeper-utils/pom.xml   | 2 +-
 pulsar-zookeeper/pom.xml | 2 +-
 27 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/all/pom.xml b/all/pom.xml
index d7c2c98..deddfac 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index fc75fe7..d561bfb 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 34a0792..d3570c6 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pom.xml b/pom.xml
index 7320661..192769f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   org.apache.pulsar
   pulsar
 
-  1.22.0-incubating-SNAPSHOT
+  2.0.0-incubating-SNAPSHOT
 
   Pulsar
   Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml 
b/pulsar-broker-auth-athenz/pom.xml
index 80c22be..48e78e8 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
   
 
   pulsar-broker-auth-athenz
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 7a1d356..666fec8 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
   
 
   pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index bda3037..efcfa76 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 9a703e0..6cd1f89 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml
index da4f778..fb61c6f 100644
--- a/pulsar-checksum/pom.xml
+++ b/pulsar-checksum/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-

[GitHub] merlimat closed pull request #1226: Bumped master to 2.0.0-incubating-SNAPSHOT

2018-02-13 Thread GitBox
merlimat closed pull request #1226: Bumped master to 2.0.0-incubating-SNAPSHOT
URL: https://github.com/apache/incubator-pulsar/pull/1226
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/all/pom.xml b/all/pom.xml
index d7c2c9854..deddfacdf 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index fc75fe739..d561bfbb1 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 34a0792d7..d3570c6db 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pom.xml b/pom.xml
index 7320661d5..192769fcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   org.apache.pulsar
   pulsar
 
-  1.22.0-incubating-SNAPSHOT
+  2.0.0-incubating-SNAPSHOT
 
   Pulsar
   Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml 
b/pulsar-broker-auth-athenz/pom.xml
index 80c22be81..48e78e87a 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
   
 
   pulsar-broker-auth-athenz
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 7a1d35606..666fec85a 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
   
 
   pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index bda3037eb..efcfa76dc 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 9a703e0d0..6cd1f8957 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml
index da4f7785f..fb61c6fb8 100644
--- a/pulsar-checksum/pom.xml
+++ b/pulsar-checksum/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index 6a2f406fc..631012175 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index 1751af3ce..0cfc9dc49 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -26,7 +26,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-auth-athenz/pom.xml 
b/pulsar-client-auth-athenz/pom.xml
index 2b4406181..e7df0fc48 100644
--- a/pulsar-client-auth-athenz/pom.xml
+++ b/pulsar-client-auth-athenz/pom.xml
@@ -25,7 +25,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-kafka-compat/pom.xml 
b/pulsar-client-kafka-compat/pom.xml
index 5a31a43df..e43d94269 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -27,7 +27,7 @@
   
 org.apache.pulsar
 pulsar
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml
index 5b1d3d36b..6e546156d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml
@@ -27,7 +27,7 @@
   
 org.apache.pulsar
 pulsar-client-kafka-compat
-1.22.0-incubating-SNAPSHOT
+2.0.0-incubating-SNAPSHOT
 ..
   
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 7212c3936..906000372 100644
--- 

[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
mgodave commented on a change in pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167996154
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -22,6 +22,20 @@ package pulsar.proto;
 option java_package = "org.apache.pulsar.common.api.proto";
 option optimize_for = LITE_RUNTIME;
 
+message Schema {
+   required string name = 1;
+   required bytes version = 2;
+   required bytes schema_data = 7;
+repeated KeyValue properties = 8;
+}
 
 Review comment:
   I took a comment from the PIP email thread regarding the arbitrariness of 
the fields chosen in the original proposal. It occurred to me that schema type 
is equally as arbitrary. First, adding a new type would require a binary 
protocol change, a new cut release, and a coordinated client/server deployment. 
By removing the schema as a hard coded field we're making the choice of how to 
identify it an end-to-end problem. If, as you commented above, we need the 
client to send the actual schema (which is an oversight on my part), then we 
only need to compare the schemas. If we need to compare them semantically then 
we can devise a server side plugin scheme to allow us to identify and compare 
"like" versions ("is this avro schema 'compatible' with this other avro 
schema?" for instance).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)

2018-02-13 Thread GitBox
mgodave commented on a change in pull request #1232: Schema registry (1/4)
URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167996411
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -272,6 +289,8 @@ message CommandProducer {
 
 /// Add optional metadata key=value to this producer
 repeated KeyValue metadata= 6;
+
+   optional int64 schema_version = 7;
 
 Review comment:
   This seems to be an oversight on my part. Since this series of changes is 
concerned with a 'repository' I appear to have glossed over this point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168006084
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (!isConnected()) {
+long opTimeoutMs = 
client.getConfiguration().getOperationTimeoutMs();
+Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+opTimeoutMs * 2, TimeUnit.MILLISECONDS,
+0 , TimeUnit.MILLISECONDS);
+
+long delayMs = backoff.firstBackoffTimeInMillis;;
+while (delayMs < opTimeoutMs && !isConnected()); {
+log.warn("[{}] [{}] Could not get connection while 
getLastMessageId -- Will try again in {} ms",
+topic, getHandlerName(), delayMs);
+try {
+Thread.sleep(delayMs);
+} catch (InterruptedException e) {
+return FutureUtil
+.failedFuture(new PulsarClientException
+.ConnectException("InterruptedException, could not 
connect"));
+}
+delayMs = backoff.next();
+}
+
+if (!isConnected()) {
+return FutureUtil.failedFuture(new PulsarClientException("Not 
connected to broker"));
+}
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v12.getNumber()) {
 
 Review comment:
   Nit: please wrap this in a method like : 
   
   ```java
   void Commands.peerSupportsGetLastMessageId(ClientCnx cnx);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168005589
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (!isConnected()) {
+long opTimeoutMs = 
client.getConfiguration().getOperationTimeoutMs();
 
 Review comment:
   This implementation will block the caller of an asynchrounous method, which 
might be unexpected. One way to do the retries would be to have an internal 
method that gets called asynchrounously in recursion. For example, something 
like : 
   
   ```java
   private CompletableFuture internalGetLastMessageIdAsync(Backoff 
backoff, long remainingTime) {
   if (connected) { 
  // write on socket and return future
   } else {
  // if time is not elapsed yet... 
  long nextDelay = backoff.next();
  executor.schedule(() -> {
 remainingTime -= (timeSpentSinceLastCall);
  internalGetLastMessageIdAsync(backoff, remainingTime);
  }, nextDelay, TimeUnit.MILLISECONDS);
   }
   }
   ```
   
   (Don't read too much in the previous example, I'm just trying to illustrate 
the basic idea)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #1223: Add a `backend` admin restful endpoint for query backend information

2018-02-13 Thread GitBox
rdhabalia commented on issue #1223: Add a `backend` admin restful endpoint for 
query backend information
URL: https://github.com/apache/incubator-pulsar/pull/1223#issuecomment-365474351
 
 
   > the purpose of this PR is to provide a method for application to know 
which zk, bk that a pulsar cluster is using.
   InternalConfiguration/InternalData ?
   
   I think we can add more configurations other than zk in future as it can 
surface internal configuration. So, should can keep it a generic  name eg: 
`InternalConfiguration`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1223: Add a `backend` admin restful endpoint for query backend information

2018-02-13 Thread GitBox
sijie commented on issue #1223: Add a `backend` admin restful endpoint for 
query backend information
URL: https://github.com/apache/incubator-pulsar/pull/1223#issuecomment-365475832
 
 
   Okay will change it to InternalConfiguration


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] saandrews opened a new pull request #1233: Added debug logs in MessageCrypto

2018-02-13 Thread GitBox
saandrews opened a new pull request #1233: Added debug logs in MessageCrypto
URL: https://github.com/apache/incubator-pulsar/pull/1233
 
 
   Added additional logging to help debugging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hrsakai opened a new pull request #1234: Make max clients per topic/subscription configurable

2018-02-13 Thread GitBox
hrsakai opened a new pull request #1234: Make max clients per 
topic/subscription configurable
URL: https://github.com/apache/incubator-pulsar/pull/1234
 
 
   ### Motivation
   Currently, max clients is not limited by broker.
   Therefore, if client implementation is incorrect, clients may grow 
infinitely.
   
   ### Modifications
   * Add max producers per topic settings to `ServerConfiguration`
   * Add max consumers per topic/subscription settings to `ServerConfiguration`
   
   ### Result
   We can set max number of clients and prevent clients from growing infinitely.
   
   Next, we plan to enable namespaces to have custom values by using `REST API` 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168036825
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (!isConnected()) {
+long opTimeoutMs = 
client.getConfiguration().getOperationTimeoutMs();
 
 Review comment:
   Thanks, will change this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia opened a new pull request #1235: Add non-persistent topic stats separately in brokers-stat

2018-02-13 Thread GitBox
rdhabalia opened a new pull request #1235: Add non-persistent topic stats 
separately in brokers-stat
URL: https://github.com/apache/incubator-pulsar/pull/1235
 
 
   ### Motivation
   
   Right now, broker-stats gives persistent/non-persistent topic-stats under 
the `persistent` topic section. We need them in separate section to manage them 
operationally.
   
   ### Modifications
   
   Add non-persistent topic under separate section.
   
   ### Result
   
   Broker-stats will show non-persistent topic in separate section.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168061308
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+0 , TimeUnit.MILLISECONDS);
+CompletableFuture getLastMessageIdFuture = new 
CompletableFuture<>();
+
+internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
+return getLastMessageIdFuture;
+}
+
+private void internalGetLastMessageIdAsync(final Backoff backoff,
+   final AtomicLong remainingTime,
+   CompletableFuture 
future) {
+if (isConnected()) {
 
 Review comment:
   Thanks, will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054738
 
 

 ##
 File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
 ##
 @@ -952,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean 
authoritative, String orig
 lookupBroker.recycle();
 return res;
 }
+
+public static boolean peerSupportsGetLastMessageId() {
+return getCurrentProtocolVersion() >= ProtocolVersion.v12.getNumber();
 
 Review comment:
   This is just checking our own protocol version (which is always the "latest" 
from when we compiled the protobuf), though we need to check the other side 
version in this case the broker. 
   
   this should be like : 
   
   ```java
   public static boolean peerSupportsGetLastMessageId(ClientCnx cnx) {
  return ctx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v12.getNumber();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054861
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -145,6 +151,9 @@
 this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
 this.readCompacted = conf.getReadCompacted();
 
+this.getLastIdExecutor = Executors
 
 Review comment:
   This would create 1 thread per each consumer, we should reuse the executor 
that is already available from `PulsarClientImpl`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168059642
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+return true;
+}
+
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+0 , TimeUnit.MILLISECONDS);
+CompletableFuture getLastMessageIdFuture = new 
CompletableFuture<>();
+
+internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
+return getLastMessageIdFuture;
+}
+
+private void internalGetLastMessageIdAsync(final Backoff backoff,
+   final AtomicLong remainingTime,
+   CompletableFuture 
future) {
+if (isConnected()) {
 
 Review comment:
   I missed this before, but we should make sure the connection doesn't change 
while we're executing this method. 
   
   `isConnected()` is checking the current connection, but that might change 
when we ask for `cnx()` few lines below. 
   
   We need to first get a reference on `ClientCnx` and use that throughout the 
method. 
   
   ```java
   ClientCnx cnx = cnx();
   
   if (cnx != null) {
  // check cnx.getRemoteEndpointProtocolVersion();
  cnx.sendGetLastMessageId()... 
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Added debug logs in MessageCrypto (#1233)

2018-02-13 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fbb42e7  Added debug logs in MessageCrypto (#1233)
fbb42e7 is described below

commit fbb42e711aa9044ec5a1b30f448173432c1d805a
Author: Andrews 
AuthorDate: Tue Feb 13 18:12:13 2018 -0800

Added debug logs in MessageCrypto (#1233)
---
 pulsar-client-cpp/lib/MessageCrypto.cc | 101 +
 pulsar-client-cpp/lib/MessageCrypto.h  |   1 +
 2 files changed, 77 insertions(+), 25 deletions(-)

diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc 
b/pulsar-client-cpp/lib/MessageCrypto.cc
index 26f1b48..0cc5dec 100644
--- a/pulsar-client-cpp/lib/MessageCrypto.cc
+++ b/pulsar-client-cpp/lib/MessageCrypto.cc
@@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& 
privateKeyStr) {
 bool MessageCrypto::getDigest(const std::string& keyName, const void* input, 
unsigned int inputLen,
   unsigned char keyDigest[], unsigned int& 
digestLen) {
 if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) {
-LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + 
keyName);
+LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << 
keyName);
 return false;
 }
 
 digestLen = 0;
 if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) {
-LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName);
+LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << 
keyName);
 return false;
 }
 
 if (EVP_DigestFinal_ex(mdCtx_, keyDigest, ) != 1) {
-LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + 
keyName);
+LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << 
keyName);
 return false;
 }
 
@@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() {
 }
 }
 
+std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t 
len) {
+static const char* hexVals = "0123456789ABCDEF";
+
+std::string outHex;
+outHex.reserve(2 * len + 2);
+outHex.push_back('0');
+outHex.push_back('x');
+for (size_t i = 0; i < len; ++i) {
+const unsigned char c = inputStr[i];
+outHex.push_back(hexVals[c >> 4]);
+outHex.push_back(hexVals[c & 15]);
+}
+return outHex;
+}
+
 Result MessageCrypto::addPublicKeyCipher(std::set& keyNames,
  const CryptoKeyReaderPtr keyReader) {
 Lock lock(mutex_);
@@ -141,7 +156,7 @@ Result 
MessageCrypto::addPublicKeyCipher(std::set& keyNames,
 
 Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const 
CryptoKeyReaderPtr keyReader) {
 if (keyName.empty()) {
-LOG_ERROR(logCtx_ + "Keyname is empty ");
+LOG_ERROR(logCtx_ << "Keyname is empty ");
 return ResultCryptoError;
 }
 
@@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const 
std::string& keyName, const Crypt
 EncryptionKeyInfo keyInfo;
 Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo);
 if (result != ResultOk) {
-LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " 
+ keyName);
+LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key 
" << keyName);
 return result;
 }
 
 RSA* pubKey = loadPublicKey(keyInfo.getKey());
 if (pubKey == NULL) {
-LOG_ERROR(logCtx_ + "Failed to load public key " + keyName);
+LOG_ERROR(logCtx_ << "Failed to load public key " << keyName);
 return ResultCryptoError;
 }
+LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully.");
 
 int inSize = RSA_size(pubKey);
 boost::scoped_array encryptedKey(new unsigned char[inSize]);
@@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& 
keyName, const Crypt
 RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), 
pubKey, RSA_PKCS1_OAEP_PADDING);
 
 if (inSize != outSize) {
-LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key 
length for key " + keyName);
+LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key 
length for key " << keyName);
 return ResultCryptoError;
 }
 std::string encryptedKeyStr(reinterpret_cast(encryptedKey.get()), 
inSize);
@@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const 
std::string& keyName, const Crypt
 eki->setMetadata(keyInfo.getMetadata());
 
 encryptedDataKeyMap_.insert(std::make_pair(keyName, eki));
+if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) {
+std::string strHex = stringToHex(encryptedKeyStr, 
encryptedKeyStr.size());
+LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName
+  

[GitHub] rdhabalia closed pull request #1233: Added debug logs in MessageCrypto

2018-02-13 Thread GitBox
rdhabalia closed pull request #1233: Added debug logs in MessageCrypto
URL: https://github.com/apache/incubator-pulsar/pull/1233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc 
b/pulsar-client-cpp/lib/MessageCrypto.cc
index 26f1b484e..0cc5decac 100644
--- a/pulsar-client-cpp/lib/MessageCrypto.cc
+++ b/pulsar-client-cpp/lib/MessageCrypto.cc
@@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& 
privateKeyStr) {
 bool MessageCrypto::getDigest(const std::string& keyName, const void* input, 
unsigned int inputLen,
   unsigned char keyDigest[], unsigned int& 
digestLen) {
 if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) {
-LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + 
keyName);
+LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << 
keyName);
 return false;
 }
 
 digestLen = 0;
 if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) {
-LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName);
+LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << 
keyName);
 return false;
 }
 
 if (EVP_DigestFinal_ex(mdCtx_, keyDigest, ) != 1) {
-LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + 
keyName);
+LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << 
keyName);
 return false;
 }
 
@@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() {
 }
 }
 
+std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t 
len) {
+static const char* hexVals = "0123456789ABCDEF";
+
+std::string outHex;
+outHex.reserve(2 * len + 2);
+outHex.push_back('0');
+outHex.push_back('x');
+for (size_t i = 0; i < len; ++i) {
+const unsigned char c = inputStr[i];
+outHex.push_back(hexVals[c >> 4]);
+outHex.push_back(hexVals[c & 15]);
+}
+return outHex;
+}
+
 Result MessageCrypto::addPublicKeyCipher(std::set& keyNames,
  const CryptoKeyReaderPtr keyReader) {
 Lock lock(mutex_);
@@ -141,7 +156,7 @@ Result 
MessageCrypto::addPublicKeyCipher(std::set& keyNames,
 
 Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const 
CryptoKeyReaderPtr keyReader) {
 if (keyName.empty()) {
-LOG_ERROR(logCtx_ + "Keyname is empty ");
+LOG_ERROR(logCtx_ << "Keyname is empty ");
 return ResultCryptoError;
 }
 
@@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const 
std::string& keyName, const Crypt
 EncryptionKeyInfo keyInfo;
 Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo);
 if (result != ResultOk) {
-LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " 
+ keyName);
+LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key 
" << keyName);
 return result;
 }
 
 RSA* pubKey = loadPublicKey(keyInfo.getKey());
 if (pubKey == NULL) {
-LOG_ERROR(logCtx_ + "Failed to load public key " + keyName);
+LOG_ERROR(logCtx_ << "Failed to load public key " << keyName);
 return ResultCryptoError;
 }
+LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully.");
 
 int inSize = RSA_size(pubKey);
 boost::scoped_array encryptedKey(new unsigned char[inSize]);
@@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& 
keyName, const Crypt
 RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), 
pubKey, RSA_PKCS1_OAEP_PADDING);
 
 if (inSize != outSize) {
-LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key 
length for key " + keyName);
+LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key 
length for key " << keyName);
 return ResultCryptoError;
 }
 std::string encryptedKeyStr(reinterpret_cast(encryptedKey.get()), 
inSize);
@@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const 
std::string& keyName, const Crypt
 eki->setMetadata(keyInfo.getMetadata());
 
 encryptedDataKeyMap_.insert(std::make_pair(keyName, eki));
+if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) {
+std::string strHex = stringToHex(encryptedKeyStr, 
encryptedKeyStr.size());
+LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName
+  << ". Encrypted key size = " << 
encryptedKeyStr.size() << ", value = " << strHex);
+}
 return ResultOk;
 }
 
@@ -212,7 +233,7 @@ bool MessageCrypto::encrypt(std::set& encKeys, 
const CryptoKeyReade
 keyInfoIter = encryptedDataKeyMap_.find(keyName);
 
   

[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085822
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Listener on the consumer state changes.
+ */
+public interface ActiveConsumerListener {
 
 Review comment:
   Sure, that makes sense. 
   
   One other thing that I was thinking is that we could have this listener to 
be more general oriented, so that we could in future reuse it for other kinds 
of notifications. 
   
   Something like `ConsumerEventsListener` so that is more neutral? What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085850
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 ##
 @@ -127,6 +129,33 @@ public ConsumerConfiguration 
setMessageListener(MessageListener messageListener)
 return this;
 }
 
+/**
+ * @return this configured {@link ActiveConsumerListener} for the consumer.
+ * @see #setActiveConsumerListener(ActiveConsumerListener)
+ * @since 1.22.0
 
 Review comment:
   since 2.0


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
merlimat commented on issue #1156: Introduce ActiveConsumerListener for 
realizing if a consumer is active in a failover subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-365509341
 
 
   @sijie Change looks good, though there's still the issue at 
https://github.com/apache/incubator-pulsar/pull/1156/files#r167413443 . I think 
is using the `consumerId` of the active consumer to decide if other consumers 
are active/inactive, but we cannot compare `consumerId` across consumers.
   
   The other comment was around making the Listener interface name more neutral 
so that we can add more event handlers, if (and when) the need arises, without 
having to add an additional listener.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData

2018-02-13 Thread GitBox
merlimat closed issue #1211: Intermittent test failure int 
ProxyForwardAuthDataTest.testForwardAuthData
URL: https://github.com/apache/incubator-pulsar/issues/1211
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData

2018-02-13 Thread GitBox
merlimat commented on issue #1211: Intermittent test failure int 
ProxyForwardAuthDataTest.testForwardAuthData
URL: 
https://github.com/apache/incubator-pulsar/issues/1211#issuecomment-365510063
 
 
   Fixed in #1230 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat?

2018-02-13 Thread GitBox
merlimat commented on issue #1228: Force to pull docker build image to 
regenerate PulsarApi.java generat?
URL: https://github.com/apache/incubator-pulsar/pull/1228#issuecomment-365509563
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat?

2018-02-13 Thread GitBox
merlimat commented on issue #1228: Force to pull docker build image to 
regenerate PulsarApi.java generat?
URL: https://github.com/apache/incubator-pulsar/pull/1228#issuecomment-365509563
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066)

2018-02-13 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7404952  Issue 937: add CommandGetLastMessageId to make reader know 
the end of topic (#1066)
7404952 is described below

commit 74049522a1d97e1171c9088acd638b426b6de015
Author: Jia Zhai 
AuthorDate: Tue Feb 13 21:21:29 2018 -0800

Issue 937: add CommandGetLastMessageId to make reader know the end of topic 
(#1066)

* add CommandGetLastMessageId to getlastMessageId of topic

* rebase master, change following comments

* add partition index in GetLastMessageIdResponse

* fix rebase error

* bump proot version to v11

* change following comments

* change following comments2

* change following comments3

* change following comments

* get cnx() first
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |7 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |9 +-
 .../apache/pulsar/broker/service/ServerCnx.java|   52 +-
 .../org/apache/pulsar/broker/service/Topic.java|3 +
 .../service/nonpersistent/NonPersistentTopic.java  |6 +
 .../broker/service/persistent/PersistentTopic.java |5 +
 .../apache/pulsar/client/api/TopicReaderTest.java  |  107 +-
 .../java/org/apache/pulsar/client/api/Reader.java  |   10 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   37 +
 .../apache/pulsar/client/impl/ConsumerImpl.java|  131 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   11 +-
 .../pulsar/client/util/ExecutorProvider.java   |2 +-
 .../org/apache/pulsar/common/api/Commands.java |   41 +-
 .../apache/pulsar/common/api/PulsarDecoder.java|   20 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 1026 
 pulsar-common/src/main/proto/PulsarApi.proto   |   47 +-
 16 files changed, 1457 insertions(+), 57 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index e13664c..9149bb9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -334,4 +334,11 @@ public interface ManagedLedger {
  * @param config
  */
 void setConfig(ManagedLedgerConfig config);
+
+/**
+ * Gets last confirmed entry of the managed ledger.
+ *
+ * @return the last confirmed entry id
+ */
+Position getLastConfirmedEntry();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 80a0bbe..89d3476 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1062,7 +1062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 Futures.waitForAll(futures).thenRun(() -> {
 callback.closeComplete(ctx);
 }).exceptionally(exception -> {
-
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
+
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()),
 ctx);
 return null;
 });
 }
@@ -1282,7 +1282,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }).exceptionally(ex -> {
 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, opReadEntry.readPosition,
 ex.getMessage());
-
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), 
opReadEntry.ctx);
+
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 opReadEntry.ctx);
 return null;
 });
 }
@@ -1351,7 +1351,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 entryCache.asyncReadEntry(ledger, position, callback, ctx);
 }).exceptionally(ex -> {
 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, position, ex.getMessage());
-
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
+
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
 return null;
 });
 }
@@ -2173,7 +2173,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 return pendingAddEntries.size();

[GitHub] merlimat closed pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-13 Thread GitBox
merlimat closed pull request #1066: Issue 937: add CommandGetLastMessageId to 
make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index e13664c27..9149bb9f9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -334,4 +334,11 @@
  * @param config
  */
 void setConfig(ManagedLedgerConfig config);
+
+/**
+ * Gets last confirmed entry of the managed ledger.
+ *
+ * @return the last confirmed entry id
+ */
+Position getLastConfirmedEntry();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 80a0bbed8..89d3476ea 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1062,7 +1062,7 @@ private void closeAllCursors(CloseCallback callback, 
final Object ctx) {
 Futures.waitForAll(futures).thenRun(() -> {
 callback.closeComplete(ctx);
 }).exceptionally(exception -> {
-
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
+
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()),
 ctx);
 return null;
 });
 }
@@ -1282,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
 }).exceptionally(ex -> {
 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, opReadEntry.readPosition,
 ex.getMessage());
-
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), 
opReadEntry.ctx);
+
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 opReadEntry.ctx);
 return null;
 });
 }
@@ -1351,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, 
ReadEntryCallback callback, Object ct
 entryCache.asyncReadEntry(ledger, position, callback, ctx);
 }).exceptionally(ex -> {
 log.error("[{}] Error opening ledger for reading at position 
{} - {}", name, position, ex.getMessage());
-
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
+
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
 return null;
 });
 }
@@ -2173,7 +2173,8 @@ public int getPendingAddEntriesCount() {
 return pendingAddEntries.size();
 }
 
-public PositionImpl getLastConfirmedEntry() {
+@Override
+public Position getLastConfirmedEntry() {
 return lastConfirmedEntry;
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0b8d51280..cb473c875 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -31,10 +31,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
@@ -59,6 +57,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
@@ -110,7 +109,7 @@
 private String originalPrincipal = null;
 private Set proxyRoles;
 private boolean authenticateOriginalAuthData;
-
+
 enum State {
 Start, Connected, Failed
 }
@@ -192,8 +191,8 @@ public void 

[GitHub] merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader

2018-02-13 Thread GitBox
merlimat commented on issue #1089: PIP-12 Introduce builder for creating 
Producer Consumer Reader
URL: https://github.com/apache/incubator-pulsar/pull/1089#issuecomment-365505133
 
 
   This is ready for review. Once this is finalized and merged, I'll start 
converting all the code to use new API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader

2018-02-13 Thread GitBox
merlimat commented on issue #1089: PIP-12 Introduce builder for creating 
Producer Consumer Reader
URL: https://github.com/apache/incubator-pulsar/pull/1089#issuecomment-365505550
 
 
   >  Once we have subscription for topics and pattern, we need add them in.
   
   Yes, the idea was to have different ways to specify: 
   ```java
   // Single topic
   client.newConsumer.topic(MY_TOPIC).subscriptioName(SUB).build();
   
   // List of topics
   List myList = ...;
   client.newConsumer.topics(myList).subscriptioName(SUB).build();
   
   // Regex
   client.newConsumer.topicsRegex("test.*").subscriptioName(SUB).build();
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services