[GitHub] ivankelly commented on a change in pull request #1205: Algorithm to find start point of compacted ledger

2018-02-09 Thread GitBox
ivankelly commented on a change in pull request #1205: Algorithm to find start 
point of compacted ledger
URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167196940
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 ##
 @@ -18,10 +18,111 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.google.common.cache.Cache;
+import com.google.common.collect.ComparisonChain;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
+final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+
 @Override
 public void newCompactedLedger(Position p, long compactedLedgerId) {}
+
+static CompletableFuture findStartPoint(LedgerHandle lh, 
PositionImpl p,
+  
Cache cache) {
+CompletableFuture promise = new CompletableFuture<>();
+findStartPointLoop(lh, p, 0, lh.getLastAddConfirmed(), promise, cache);
+return promise;
+}
+
+private static void findStartPointLoop(LedgerHandle lh, PositionImpl p, 
long start, long end,
+   CompletableFuture promise,
+   Cache 
cache) {
+long midpoint = start + ((end - start) / 2);
+
+CompletableFuture startEntry = readOneMessageId(lh, 
start, cache);
+CompletableFuture middleEntry = readOneMessageId(lh, 
midpoint, cache);
+CompletableFuture endEntry = readOneMessageId(lh, end, 
cache);
+
+CompletableFuture.allOf(startEntry, middleEntry, 
endEntry).whenComplete(
+(v, exception) -> {
+if (exception != null) {
+promise.completeExceptionally(exception);
+}
+try {
+if (comparePositionAndMessageId(p, startEntry.get()) < 
0) {
+promise.complete(start);
+} else if (comparePositionAndMessageId(p, 
middleEntry.get()) < 0) {
+findStartPointLoop(lh, p, start, midpoint, 
promise, cache);
+} else if (comparePositionAndMessageId(p, 
endEntry.get()) < 0) {
+findStartPointLoop(lh, p, midpoint + 1, end, 
promise, cache);
+} else {
+promise.complete(NEWER_THAN_COMPACTED);
+}
+} catch (InterruptedException ie) {
+// should never happen as all should have been 
completed
+Thread.currentThread().interrupt();
+log.error("Interrupted waiting on futures which should 
have completed", ie);
+} catch (ExecutionException e) {
+// shouldn't happen, allOf should have given us the 
exception
+promise.completeExceptionally(e);
+}
+});
+}
+
+private static CompletableFuture 
readOneMessageId(LedgerHandle lh, long entryId,
 
 Review comment:
   Ah, I wasn't aware of Caffeine. Changed to use it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
rdhabalia commented on a change in pull request #1208: Add 
hostname-verification at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167159624
 
 

 ##
 File path: 
pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
 ##
 @@ -0,0 +1,82 @@
+Certificate:
 
 Review comment:
   I checked the rat-check plugin in 
[pom](https://github.com/apache/incubator-pulsar/blob/master/pom.xml#L760) 
which ignores all *.cert/*.key file but doesn't exclude `*.pem` so, I will add 
it too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
rdhabalia commented on a change in pull request #1208: Add 
hostname-verification at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167158945
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 ##
 @@ -356,4 +357,21 @@ public void 
setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRe
 this.maxNumberOfRejectedRequestPerConnection = 
maxNumberOfRejectedRequestPerConnection;
 }
 
+public boolean isTlsHostnameVerificationEnable() {
+return tlsHostnameVerificationEnable;
+}
+
+/**
+ * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+ * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1. Server
+ * Identity hostname verification.
+ * 
+ * @see https://tools.ietf.org/html/rfc2818;>rfc2818
+ * 
+ * @param tlsHostnameVerificationEnable
+ */
+public void setTlsHostnameVerificationEnable(boolean 
tlsHostnameVerificationEnable) {
 
 Review comment:
   umm.. actually one can use `allowInsecureConnection` in non-prod env which 
makes client to  trust all X.509 certificates without any verification using 
`InsecureTrustManagerFactory.java`.  However, hostname verification can be 
applied on top of secured connection as well.
   
   >  that is what the HTTP client is following anyway.
   
   Actually even HTTP client also provides separate API to [set 
hostNameVerifier](http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/client/HttpClientBuilder.html#setHostnameVerifier(org.apache.http.conn.ssl.X509HostnameVerifier))
   
   So, as both the configs serve different purpose then shouldn't it better to 
give flexibility while configuring it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1205: Algorithm to find start 
point of compacted ledger
URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167300931
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 ##
 @@ -18,10 +18,109 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.ComparisonChain;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
+final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+
 @Override
 public void newCompactedLedger(Position p, long compactedLedgerId) {}
+
+static CompletableFuture findStartPoint(PositionImpl p,
+  long lastEntryId,
+  
AsyncLoadingCache cache) {
+CompletableFuture promise = new CompletableFuture<>();
+findStartPointLoop(p, 0, lastEntryId, promise, cache);
+return promise;
+}
+
+private static void findStartPointLoop(PositionImpl p, long start, long 
end,
+   CompletableFuture promise,
+   
AsyncLoadingCache cache) {
+long midpoint = start + ((end - start) / 2);
+
+CompletableFuture startEntry = cache.get(start);
+CompletableFuture middleEntry = cache.get(midpoint);
+CompletableFuture endEntry = cache.get(end);
+
+CompletableFuture.allOf(startEntry, middleEntry, 
endEntry).whenComplete(
+(v, exception) -> {
+if (exception != null) {
+promise.completeExceptionally(exception);
+}
+try {
+if (comparePositionAndMessageId(p, startEntry.get()) < 
0) {
 
 Review comment:
   Also, if doing like : 
   
   ```java
   CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(() -> {
   // Success case
   }).exceptionally(ex -> {
 promise.completeExceptionally(exception);
 return null;
   });
   ```
   
   The `exceptionally` block will also be executed if there were any exception 
in the `thenRun()` code, in addition to the async operation failing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy

2018-02-09 Thread GitBox
merlimat opened a new pull request #1210: Cancel keep-alive timer task after 
the proxy switch to TCP proxy
URL: https://github.com/apache/incubator-pulsar/pull/1210
 
 
   ### Motivation
   
   After initial handshake, the Pulsar proxy switches to TCP proxy mode, by 
just copying buffers between the 2 connections and avoiding all parsing. 
   
   When that happens, for keep-alive messages, we rely on what the client and 
broker are exchanging, so they will be able to detect a stale connection 
client-proxy or proxy-broker. 
   Currently, we're not removing the keep-alive timer in the proxy, so it's 
forcefully closing the connection after 60s, even though client and broker are 
perfectly fine.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1208: Add hostname-verification 
at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167302047
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 ##
 @@ -356,4 +357,21 @@ public void 
setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRe
 this.maxNumberOfRejectedRequestPerConnection = 
maxNumberOfRejectedRequestPerConnection;
 }
 
+public boolean isTlsHostnameVerificationEnable() {
+return tlsHostnameVerificationEnable;
+}
+
+/**
+ * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+ * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1. Server
+ * Identity hostname verification.
+ * 
+ * @see https://tools.ietf.org/html/rfc2818;>rfc2818
+ * 
+ * @param tlsHostnameVerificationEnable
+ */
+public void setTlsHostnameVerificationEnable(boolean 
tlsHostnameVerificationEnable) {
 
 Review comment:
   Sure, we should just make sure the same check are done in http vs protobuf 
for the same config


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData

2018-02-09 Thread GitBox
merlimat opened a new issue #1211: Intermittent test failure int 
ProxyForwardAuthDataTest.testForwardAuthData
URL: https://github.com/apache/incubator-pulsar/issues/1211
 
 
   
https://builds.apache.org/job/pulsar-pull-request/org.apache.pulsar$pulsar-proxy/1581/testReport/junit/org.apache.pulsar.proxy.server/ProxyForwardAuthDataTest/testForwardAuthData/
   
   ```
   Error Message
   
   java.nio.channels.ClosedChannelException
   
   Stacktrace
   
   org.apache.pulsar.client.api.PulsarClientException: 
java.nio.channels.ClosedChannelException
at 
org.apache.pulsar.client.impl.PulsarClientImpl.subscribe(PulsarClientImpl.java:215)
at 
org.apache.pulsar.client.impl.PulsarClientImpl.subscribe(PulsarClientImpl.java:202)
at 
org.apache.pulsar.proxy.server.ProxyForwardAuthDataTest.testForwardAuthData(ProxyForwardAuthDataTest.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
rdhabalia commented on a change in pull request #1208: Add 
hostname-verification at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167332606
 
 

 ##
 File path: pulsar-client-shaded/pom.xml
 ##
 @@ -81,6 +81,8 @@
   org.apache.pulsar:pulsar-checksum
   net.jpountz.lz4:lz4
   com.yahoo.datasketches:sketches-core
+  org.apache.httpcomponents:httpclient
+  commons-logging:commons-logging
 
 Review comment:
   I see..let me add it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1200: Add pluggable authorization mechanism

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1200: Add pluggable 
authorization mechanism
URL: https://github.com/apache/incubator-pulsar/pull/1200#discussion_r167298488
 
 

 ##
 File path: conf/proxy.conf
 ##
 @@ -49,6 +49,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider name list, which is comma separated list of class 
names
+authorizationProviders=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
 
 Review comment:
   Ok, it is always easier to add it if needed, then remove it later
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1205: Algorithm to find start 
point of compacted ledger
URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167300225
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 ##
 @@ -18,10 +18,109 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.ComparisonChain;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
+final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+
 @Override
 public void newCompactedLedger(Position p, long compactedLedgerId) {}
+
+static CompletableFuture findStartPoint(PositionImpl p,
+  long lastEntryId,
+  
AsyncLoadingCache cache) {
+CompletableFuture promise = new CompletableFuture<>();
+findStartPointLoop(p, 0, lastEntryId, promise, cache);
+return promise;
+}
+
+private static void findStartPointLoop(PositionImpl p, long start, long 
end,
+   CompletableFuture promise,
+   
AsyncLoadingCache cache) {
+long midpoint = start + ((end - start) / 2);
+
+CompletableFuture startEntry = cache.get(start);
+CompletableFuture middleEntry = cache.get(midpoint);
+CompletableFuture endEntry = cache.get(end);
+
+CompletableFuture.allOf(startEntry, middleEntry, 
endEntry).whenComplete(
+(v, exception) -> {
+if (exception != null) {
+promise.completeExceptionally(exception);
+}
+try {
+if (comparePositionAndMessageId(p, startEntry.get()) < 
0) {
 
 Review comment:
   Nit: you could use `.join()` instead of `get()`, that throws unchecked 
exception, since in this case we're not expecting any.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1208: Add hostname-verification 
at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167332300
 
 

 ##
 File path: pulsar-client-shaded/pom.xml
 ##
 @@ -81,6 +81,8 @@
   org.apache.pulsar:pulsar-checksum
   net.jpountz.lz4:lz4
   com.yahoo.datasketches:sketches-core
+  org.apache.httpcomponents:httpclient
+  commons-logging:commons-logging
 
 Review comment:
   Can you also add these in `pulsar-broker-shaded/pom.xml` and 
`pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml`. I know it's painful, 
but couldn't find a reliable way to do the shading in the different modules.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1208: Add hostname-verification 
at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167302744
 
 

 ##
 File path: pom.xml
 ##
 @@ -138,6 +138,18 @@ flexible messaging model and an intuitive client 
API.
 
   
 
+  
 
 Review comment:
   This dependency should be added at the end of 
`all/src/assemble/LICENSE.bin.txt`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1209: Fixed missing '"' sign in system metrics for Prometheus

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1209: Fixed missing '"' sign in 
system metrics for Prometheus
URL: https://github.com/apache/incubator-pulsar/pull/1209#discussion_r167297551
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 ##
 @@ -85,14 +85,13 @@ private static void 
generateSystemMetrics(SimpleTextOutputStream stream, String
 for (int i = 0; i < metricFamily.samples.size(); i++) {
 Sample sample = metricFamily.samples.get(i);
 stream.write(sample.name);
-stream.write("{cluster=\"").write(cluster).write("\",");
+stream.write("{cluster=\"").write(cluster).write('"');
 
 Review comment:
   Yes, in the previous commit I did add a parser for the Prometheus text 
format to verify the metrics in the unit tests. Though, the parser is regex 
based and it's not validating 100% the format. In this PR I added the check to 
get the value of the tag, so that it was failing before the fix and passing 
after.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed issue #1074: Document features and usage of Pulsar proxy

2018-02-09 Thread GitBox
merlimat closed issue #1074: Document features and usage of Pulsar proxy
URL: https://github.com/apache/incubator-pulsar/issues/1074
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on issue #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
jai1 commented on issue #1208: Add hostname-verification at client tls 
connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364523700
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
rdhabalia commented on a change in pull request #1208: Add 
hostname-verification at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167318158
 
 

 ##
 File path: pom.xml
 ##
 @@ -138,6 +138,18 @@ flexible messaging model and an intuitive client 
API.
 
   
 
+  
 
 Review comment:
   I have added `org.apache.httpcomponents:httpclient` in client-shadeed pom 
and License file. However, I excluded all other jars from httpclient to avoid 
bringing lot of other things from it, but it requires 
`commons-logging:commons-logging` for internal logging without it we see 
ClassNotFoundException for logger class. So, I have added that dep explicitly 
and added to LICENSE and cliend-sahde as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on issue #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
jai1 commented on issue #1208: Add hostname-verification at client tls 
connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364523700
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
 ##
 @@ -37,6 +36,16 @@
  */
 byte[] toByteArray();
 
+/**
+ * Get the topic name of this MessageId.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` 
for `redeliverUnacknowledgedMessages`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167251458
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
 ##
 @@ -359,4 +362,59 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map keyMe
 reader.close();
 log.info("-- Exiting {} test --", methodName);
 }
+
+
+@Test
+public void testSimpleReaderReachEndofTopic() throws Exception {
 
 Review comment:
   thanks, will add the test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167258867
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
 
 Review comment:
   HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was 
an issue opened, which plan to remove it from ConsumerBase. 
   Since ConsumerBase has did some of the  work, would like to leverage it and 
not to do replicated work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to 
make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-364459531
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167261106
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 ##
 @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() {
 }
 
 @Override
-public void redeliverUnacknowledgedMessages(Set messageIds) 
{
+public void redeliverUnacknowledgedMessages(Set messageIds) {
 
 Review comment:
   Thanks, As above reply, we also need this redeliverUnacknowledgedMessages 
method in Consumer.java  handling TopicMessageIdImpl. would like to change this 
in  Consumer.java  and make the case in each child class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it 
should also a member of PartitionedConsumer and TopicsConsumer here. Most of 
the change is to leverage MessageId::compareTo to make the code clearer. and 
seems it not casts the type too often here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   UnackedMessageTracker is not only a member of ConsumerImpl, it should also a 
member of PartitionedConsumer and TopicsConsumer here. Most of the change is to 
leverage MessageId::compareTo to make the code clearer. and seems it not casts 
the type too often here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1209: Fixed missing '"' sign in system metrics for Prometheus

2018-02-09 Thread GitBox
merlimat closed pull request #1209: Fixed missing '"' sign in system metrics 
for Prometheus
URL: https://github.com/apache/incubator-pulsar/pull/1209
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 26118c81a..e7bcda1b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -85,14 +85,13 @@ private static void 
generateSystemMetrics(SimpleTextOutputStream stream, String
 for (int i = 0; i < metricFamily.samples.size(); i++) {
 Sample sample = metricFamily.samples.get(i);
 stream.write(sample.name);
-stream.write("{cluster=\"").write(cluster).write("\",");
+stream.write("{cluster=\"").write(cluster).write('"');
 for (int j = 0; j < sample.labelNames.size(); j++) {
+stream.write(", ");
 stream.write(sample.labelNames.get(j));
 stream.write("=\"");
 stream.write(sample.labelValues.get(j));
-if (j != sample.labelNames.size() - 1) {
-stream.write("\",");
-}
+stream.write('"');
 }
 
 stream.write("} ");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index be19b571d..8aa6fdc2c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -90,6 +90,11 @@ public void testPerTopicStats() throws Exception {
 assertEquals(cm.get(1).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic1");
 assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+cm = (List) metrics.get("topic_load_times_count");
+assertEquals(cm.size(), 1);
+assertEquals(cm.get(0).value, 2.0);
+assertEquals(cm.get(0).tags.get("cluster"), "test");
+
 p1.close();
 p2.close();
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372153
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167336300
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -135,6 +135,7 @@ enum ProtocolVersion {
v9 = 9;  // Added end of topic notification
v10 = 10;// Added proxy to broker
v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
+//Added get topic's last messageId from broker
 
 Review comment:
   @zhaijack There was a merge issue here, `v11` was already taken in master 
and this PR should be adding `v12` now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy

2018-02-09 Thread GitBox
merlimat commented on issue #1210: Cancel keep-alive timer task after the proxy 
switch to TCP proxy
URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364562235
 
 
   > but right now, shouldn't proxy also handle client's keep-alive request on 
PulsarHandler.handlePing() which should keep connection alive between client to 
proxy and same way proxy to broker?
   
   The proxy just passes along bytes at that point. The ping from client is 
just passed on to broker, which is the one that will respond. If there is a 
network partition either between client/proxy or proxy/broker, it will be 
identfied.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy

2018-02-09 Thread GitBox
rdhabalia commented on issue #1210: Cancel keep-alive timer task after the 
proxy switch to TCP proxy
URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364563293
 
 
   yes, once proxy state is in `ProxyConnectionToBroker` then it becomes just 
pass through.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Add hostname-verification at client tls connection (#1208)

2018-02-09 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d3ab43  Add hostname-verification at client tls connection (#1208)
8d3ab43 is described below

commit 8d3ab43cee86c9e49a54db13929a4ecb09e8152f
Author: Rajan Dhabalia 
AuthorDate: Fri Feb 9 17:43:23 2018 -0800

Add hostname-verification at client tls connection (#1208)

* Add hostname-verification at client tls connection

* add httpclient dep with exclude all + add pem in apache-rat

* add httpclient+commons-logging dep in client-shading and LICENSE

* shade artifacts

* fix: proxy send certs to client for host verification
---
 all/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml|  13 ++
 pulsar-broker-shaded/pom.xml   |   6 +
 .../broker/service/PulsarChannelInitializer.java   |  14 ++
 .../AuthenticationTlsHostnameVerificationTest.java | 255 +
 .../tls/hn-verification/broker-cert.pem|  82 +++
 .../tls/hn-verification/broker-key.pem |  28 +++
 .../authentication/tls/hn-verification/cacert.pem  |  79 +++
 .../pulsar-client-kafka/pom.xml|   6 +
 pulsar-client-shaded/pom.xml   |   6 +
 pulsar-client/pom.xml  |  18 ++
 .../pulsar/client/api/ClientConfiguration.java |  18 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  54 -
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +
 .../proxy/server/ServiceChannelInitializer.java|  14 ++
 .../server/ProxyWithProxyAuthorizationTest.java|  51 -
 16 files changed, 642 insertions(+), 6 deletions(-)

diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt
index 41f9000..a7e70de 100644
--- a/all/src/assemble/LICENSE.bin.txt
+++ b/all/src/assemble/LICENSE.bin.txt
@@ -332,6 +332,8 @@ The Apache Software License, Version 2.0
  * Jetty - org.eclipse.jetty-*.jar
  * SnakeYaml -- org.yaml-snakeyaml-*.jar
  * RocksDB - org.rocksdb.*.jar
+ * HttpClient - org.apache.httpcomponents.httpclient.jar
+ * CommonsLogging - commons-logging-*.jar
 
 BSD 3-clause "New" or "Revised" License
  * EA Agent Loader -- com.ea.agentloader-*.jar -- 
licenses/LICENSE-EA-Agent-Loader.txt
diff --git a/pom.xml b/pom.xml
index 27ff691..7320661 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,18 @@ flexible messaging model and an intuitive client 
API.
   
 
   
+org.apache.httpcomponents
+httpclient
+4.5.5
+
+  
+*
+*
+  
+
+  
+  
+  
 org.testng
 testng
 6.13.1
@@ -760,6 +772,7 @@ flexible messaging model and an intuitive client 
API.
 **/*.crt
 **/*.key
 **/*.csr
+**/*.pem
 **/*.json
 **/*.htpasswd
 src/test/resources/athenz.conf.test
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index fd3ff68..bda3037 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -104,6 +104,8 @@
   org.aspectj:*
   com.ea.agentloader:*
   com.wordnik:swagger-annotations
+  org.apache.httpcomponents:httpclient
+  commons-logging:commons-logging
 
   
   
@@ -298,6 +300,10 @@
   com.wordnik
   
org.apache.pulsar.shade.com.worknik
 
+
+  org.apache.http
+  
org.apache.pulsar.shade.org.apache.http
+
   
 
   
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index cd0415a..3138769 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker.service;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
@@ -68,6 +71,17 @@ public class PulsarChannelInitializer extends 
ChannelInitializer
 builder.trustManager(trustCertCollection);
 }
 }
+
+

[GitHub] maskit opened a new pull request #1212: Support custom URL scheme handlers

2018-02-09 Thread GitBox
maskit opened a new pull request #1212: Support custom URL scheme handlers
URL: https://github.com/apache/incubator-pulsar/pull/1212
 
 
   ### Motivation
   
   Data URL scheme is implemented in Athenz auth plugin but it's not Athenz 
specific and should be able to use from other places.
   
   ### Modifications
   
   - Generalize custom URL scheme support
   
   ### Result
   
   - Be able to use Data URL scheme from anywhere
   - Be able to add other custom schemes easily
   
   ### Note
   I set milestone to 2.0.0 but it doesn't break any compatibility. It's just 
for release schedule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382938
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
+return FutureUtil
+.failedFuture(new PulsarClientException
+.NotSupportedException("GetLastMessageId Not supported for 
ProtocolVersion: " +
+cnx().getRemoteEndpointProtocolVersion()));
+}
+
+if (!isConnected()) {
 
 Review comment:
   If it's currently not connected, we should try to mask the exception from 
the user if the failure is transient. 
   
   There is already an `operationTimeout` defined in client, it would be good 
to have a way to retry internally with backoff up to that amount of time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic

2018-02-09 Thread GitBox
merlimat commented on a change in pull request #1066: Issue 937: add 
CommandGetLastMessageId to make reader know the end of topic
URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382822
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
 return seekFuture;
 }
 
+public boolean hasMessageAvailable() throws PulsarClientException {
+try {
+return hasMessageAvailableAsync().get();
+} catch (ExecutionException | InterruptedException e) {
+throw new PulsarClientException(e);
+}
+}
+
+public CompletableFuture hasMessageAvailableAsync() {
+final CompletableFuture booleanFuture = new 
CompletableFuture<>();
+
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) {
+booleanFuture.complete(true);
+} else {
+getLastMessageIdAsync().thenAccept(messageId -> {
+lastMessageIdInBroker = messageId;
+if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 &&
+((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) 
{
+booleanFuture.complete(true);
+} else {
+booleanFuture.complete(false);
+}
+}).exceptionally(e -> {
+log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
+booleanFuture.completeExceptionally(e.getCause());
+return null;
+});
+}
+return booleanFuture;
+}
+
+private CompletableFuture getLastMessageIdAsync() {
+if (getState() == State.Closing || getState() == State.Closed) {
+return FutureUtil
+.failedFuture(new 
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
+}
+
+if (cnx().getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
 
 Review comment:
   If it's not connected `cnx()` will return null here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on issue #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
maskit commented on issue #1208: Add hostname-verification at client tls 
connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364618475
 
 
   Actually I was looking into the code, and found much of duplicate code.
   
   The blocks for `trustManager` and `keyManager` really seem like 
`SecurityUtility::createNettySslContext`. Probably only difference is context 
for client vs for server.
   
   Using `SecurityUtility` class would also remove the dependency for 
`org.apache.pulsar.client.impl.auth.AuthenticationDataTls`. That is what 
`AuthenticationDataTls` use internally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper

2018-02-09 Thread GitBox
merlimat commented on issue #1207: Allow to configure most 
client/producer/consumer options in Kafka API wrapper
URL: https://github.com/apache/incubator-pulsar/pull/1207#issuecomment-364614000
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group

2018-02-09 Thread GitBox
sijie commented on issue #1156: Introduce ConsumerGroupListener for realizing 
if a consumer is active in a failover subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-364625065
 
 
   @merlimat I have addressed your comments. can you review this again? After 
it looks good, I will rebase it to latest master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group

2018-02-09 Thread GitBox
sijie commented on a change in pull request #1156: Introduce 
ConsumerGroupListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167388187
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 ##
 @@ -127,6 +129,33 @@ public ConsumerConfiguration 
setMessageListener(MessageListener messageListener)
 return this;
 }
 
+/**
+ * @return this configured {@link ConsumerGroupListener} for the consumer.
+ * @see #setConsumerGroupListener(ConsumerGroupListener)
+ * @since 1.22.0
+ */
+public ConsumerGroupListener getConsumerGroupListener() {
 
 Review comment:
   I renamed "ConsumerGroupListener" to "ActiveConsumerListener". 
   
   I added the logic on pulsar client to fail `subscribe` if a listener is set 
for non-failover subscription.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on a change in pull request #1213: Use SecurityUtility class

2018-02-09 Thread GitBox
maskit commented on a change in pull request #1213: Use SecurityUtility class
URL: https://github.com/apache/incubator-pulsar/pull/1213#discussion_r167390938
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 ##
 @@ -55,24 +47,12 @@ public ServiceChannelInitializer(ProxyService 
proxyService, ProxyConfiguration s
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 if (enableTLS) {
-File tlsCert = new File(serviceConfig.getTlsCertificateFilePath());
-File tlsKey = new File(serviceConfig.getTlsKeyFilePath());
-SslContextBuilder builder = SslContextBuilder.forServer(tlsCert, 
tlsKey);
-// allows insecure connection
-builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
-SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
+SslContext sslCtx = 
SecurityUtility.createNettySslContextForClient(true,
 
 Review comment:
   Note that the first argument for`allowInsecureConnection` is `true` because 
the original code always allows insecure connection regardless of 
configuration. If it wasn't intentional, it need to be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on issue #1213: Use SecurityUtility class

2018-02-09 Thread GitBox
maskit commented on issue #1213: Use SecurityUtility class
URL: https://github.com/apache/incubator-pulsar/pull/1213#issuecomment-364630011
 
 
   I know `SecurityUtility` class itself should be refactored too but I'll do 
that later to make this PR simple and easy to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #1213: Use SecurityUtility class

2018-02-09 Thread GitBox
rdhabalia commented on a change in pull request #1213: Use SecurityUtility class
URL: https://github.com/apache/incubator-pulsar/pull/1213#discussion_r167391135
 
 

 ##
 File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 ##
 @@ -55,24 +47,12 @@ public ServiceChannelInitializer(ProxyService 
proxyService, ProxyConfiguration s
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 if (enableTLS) {
-File tlsCert = new File(serviceConfig.getTlsCertificateFilePath());
-File tlsKey = new File(serviceConfig.getTlsKeyFilePath());
-SslContextBuilder builder = SslContextBuilder.forServer(tlsCert, 
tlsKey);
-// allows insecure connection
-builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
-SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
+SslContext sslCtx = 
SecurityUtility.createNettySslContextForClient(true,
 
 Review comment:
   can we also add comment 
`SecurityUtility.createNettySslContextForClient(true\* to allow 
InsecureConnection*\,..)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207)

2018-02-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 16a554b  Allow to configure most client/producer/consumer options in 
Kafka API wrapper (#1207)
16a554b is described below

commit 16a554bfd5903482512d9a56ccb21dfd5c01
Author: Matteo Merli 
AuthorDate: Fri Feb 9 21:32:49 2018 -0800

Allow to configure most client/producer/consumer options in Kafka API 
wrapper (#1207)
---
 .../clients/consumer/PulsarKafkaConsumer.java  | 11 ++--
 .../clients/producer/PulsarKafkaProducer.java  |  8 +--
 ...fkaConfig.java => PulsarClientKafkaConfig.java} | 44 ++-
 .../kafka/compat/PulsarConsumerKafkaConfig.java| 50 +
 .../kafka/compat/PulsarProducerKafkaConfig.java| 64 ++
 site/docs/latest/adaptors/KafkaWrapper.md  | 42 +++---
 6 files changed, 203 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index d3dc6e4..97cde46 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -56,7 +56,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -80,6 +81,8 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene
 
 private volatile boolean closed = false;
 
+private final Properties properties;
+
 private static class QueueItem {
 final org.apache.pulsar.client.api.Consumer consumer;
 final Message message;
@@ -141,9 +144,9 @@ public class PulsarKafkaConsumer implements 
Consumer, MessageListene
 
 String serviceUrl = 
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
-Properties properties = new Properties();
+this.properties = new Properties();
 config.originals().forEach((k, v) -> properties.put(k, v));
-ClientConfiguration clientConf = 
PulsarKafkaConfig.getClientConfiguration(properties);
+ClientConfiguration clientConf = 
PulsarClientKafkaConfig.getClientConfiguration(properties);
 // Since this client instance is going to be used just for the 
consumers, we can enable Nagle to group
 // all the acknowledgments sent to broker within a short time frame
 clientConf.setUseTcpNoDelay(false);
@@ -201,7 +204,7 @@ public class PulsarKafkaConsumer implements 
Consumer, MessageListene
 // acknowledgeCumulative()
 int numberOfPartitions = ((PulsarClientImpl) 
client).getNumberOfPartitions(topic).get();
 
-ConsumerConfiguration conf = new ConsumerConfiguration();
+ConsumerConfiguration conf = 
PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
 conf.setSubscriptionType(SubscriptionType.Failover);
 conf.setMessageListener(this);
 if (numberOfPartitions > 1) {
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 793a641..7b8bf9a 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 
 public class 

[GitHub] maskit commented on issue #1208: Add hostname-verification at client tls connection

2018-02-09 Thread GitBox
maskit commented on issue #1208: Add hostname-verification at client tls 
connection
URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364630116
 
 
   Submitted a PR #1213 to address the code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia opened a new pull request #1214: Support hostname verification on proxy to broker connection

2018-02-09 Thread GitBox
rdhabalia opened a new pull request #1214: Support hostname verification on 
proxy to broker connection
URL: https://github.com/apache/incubator-pulsar/pull/1214
 
 
   ### Motivation
   
   In #1208, we have added support for hostname verification at client when 
client creates tls connection with broker and proxy.
   However, if proxy is also not in local n/w then it would also require to 
support hostname verification when it connects with broker.
   
   ### Modifications
   
   add option at proxy which forces proxy to do hostname verification when it 
connects to broker.
   
   ### Result
   
   proxy can support hostname verification when it connects to broker.
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fixed missing '"' sign in system metrics for Prometheus (#1209)

2018-02-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 50beaca  Fixed missing '"' sign in system metrics for Prometheus 
(#1209)
50beaca is described below

commit 50beacae60cb6ec9384c1fa2b39c4ca3cb7cbe80
Author: Matteo Merli 
AuthorDate: Fri Feb 9 12:30:02 2018 -0800

Fixed missing '"' sign in system metrics for Prometheus (#1209)
---
 .../pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java | 7 +++
 .../java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 5 +
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 26118c8..e7bcda1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -85,14 +85,13 @@ public class PrometheusMetricsGenerator {
 for (int i = 0; i < metricFamily.samples.size(); i++) {
 Sample sample = metricFamily.samples.get(i);
 stream.write(sample.name);
-stream.write("{cluster=\"").write(cluster).write("\",");
+stream.write("{cluster=\"").write(cluster).write('"');
 for (int j = 0; j < sample.labelNames.size(); j++) {
+stream.write(", ");
 stream.write(sample.labelNames.get(j));
 stream.write("=\"");
 stream.write(sample.labelValues.get(j));
-if (j != sample.labelNames.size() - 1) {
-stream.write("\",");
-}
+stream.write('"');
 }
 
 stream.write("} ");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index be19b57..8aa6fdc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -90,6 +90,11 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 assertEquals(cm.get(1).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic1");
 assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+cm = (List) metrics.get("topic_load_times_count");
+assertEquals(cm.size(), 1);
+assertEquals(cm.get(0).value, 2.0);
+assertEquals(cm.get(0).tags.get("cluster"), "test");
+
 p1.close();
 p2.close();
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.


[GitHub] rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy

2018-02-09 Thread GitBox
rdhabalia commented on issue #1210: Cancel keep-alive timer task after the 
proxy switch to TCP proxy
URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364559864
 
 
   > so it's forcefully closing the connection after 60s
   
   but right now, shouldn't proxy also handle client's keep-alive request on 
`PulsarHandler.handlePing()`  which should keep connection alive between client 
to proxy and same way proxy to broker?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Cancel keep-alive timer task after the proxy switch to TCP proxy (#1210)

2018-02-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f288db5  Cancel keep-alive timer task after the proxy switch to TCP 
proxy (#1210)
f288db5 is described below

commit f288db58b45821ee7f5df2694eada6236afbe1a4
Author: Matteo Merli 
AuthorDate: Fri Feb 9 13:02:28 2018 -0800

Cancel keep-alive timer task after the proxy switch to TCP proxy (#1210)
---
 .../main/java/org/apache/pulsar/common/api/PulsarHandler.java | 11 ---
 .../java/org/apache/pulsar/proxy/server/ProxyConnection.java  |  1 +
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java
index 3d8e7ce..1094359 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java
@@ -67,9 +67,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
 
 @Override
 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-if (keepAliveTask != null) {
-keepAliveTask.cancel(false);
-}
+cancelKeepAliveTask();
 }
 
 @Override
@@ -113,6 +111,13 @@ public abstract class PulsarHandler extends PulsarDecoder {
 }
 }
 
+protected void cancelKeepAliveTask() {
+if (keepAliveTask != null) {
+keepAliveTask.cancel(false);
+keepAliveTask = null;
+}
+}
+
 /**
  * @return true if the connection is ready to use, meaning the Pulsar 
handshake was already completed
  */
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 921376f..68bd022 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -168,6 +168,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener