[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524152
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
 ##
 @@ -75,6 +78,16 @@
  */
 Map getProperties();
 
+/**
+ * Return entry at the position.
+ */
+Entry readEntry(PositionImpl position) throws InterruptedException, 
ExecutionException;
 
 Review comment:
   This can already be done through `replayEntries()` and 
`asyncReplayEntries()` methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524486
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 ##
 @@ -330,4 +330,24 @@
  * Set subscriptionInitialPosition for the consumer
 */
 ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition 
subscriptionInitialPosition);
+
+/**
+ * Set maximum number of redelivery.
+ * Message exceeding the maximum number of redelivery will send to Dead 
Letter Topic and acknowledged automatic.
+ * @param maxRedeliveryCount maximum number of redelivery
+ */
+ConsumerBuilder maxRedeliveryCount(int maxRedeliveryCount);
+
+/**
+ * Set name of Dead Letter Topic.
+ * Before set name of Dead Letter Topic, ensure that maxRedeliveryCount > 0
+ * @param deadLetterTopic name of dead letter topic
+ */
+ConsumerBuilder deadLetterTopic(String deadLetterTopic);
+
+/**
+ * Set max un-acked messages per consumer.
+ * This config should less than broker config, if not, config will not 
enable. 0 is not limit, default is 0
+ */
+ConsumerBuilder maxUnackedMessagesPerConsumer(int 
maxUnackedMessagesPerConsumer);
 
 Review comment:
   From API perspective it might be good to encapsulate all the DLQ related 
settings into a single `DeadLetterQueuePolicy` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
merlimat commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212524714
 
 

 ##
 File path: pulsar-common/src/main/proto/PulsarApi.proto
 ##
 @@ -229,6 +229,16 @@ message CommandSubscribe {
// Signal wthether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];
+   // Maximum number of redeliveries.
+   // Message exceeding the maximum number of redeliveries should send to 
Dead Letter Topic and acknowledged automatic.
+   // Enable this feature by set maxRedeliveryCount > 0
+   optional int32 maxRedeliveryCount = 14;
+   // Name of Dead Letter Topic.
+   // If not set, pulsar broker will generate with topic name and 
subscription name and suffix with -DLQ
+   optional string deadLetterTopic = 15;
 
 Review comment:
   I think it's a bit dangerous to leave the option to have a large number of  
possible topics for which we need to create producer objects inside the broker.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client

2018-08-23 Thread GitBox
merlimat commented on issue #2431: Unable to consume messages, need to 
re-subscribe client
URL: 
https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415654048
 
 
   @bardock From the stats, it seems all partitions have rate 0, and no backlog 
accumulated. I couldn't see any stuck consumer/subscription in there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   Seems it is up to application to decide.  In this change, application can 
choose its own name, and only use “-DLQ” when the application doesn’t specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2439: leak assign schema in MessageImpl constructor

2018-08-23 Thread GitBox
srkukarni closed pull request #2439: leak assign schema in MessageImpl 
constructor
URL: https://github.com/apache/incubator-pulsar/pull/2439
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 5ea8bfef5e..96b68c1af4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -155,6 +155,7 @@ public MessageImpl(String msgId, Map 
properties, ByteBuf payload
 this.cnx = null;
 this.payload = payload;
 this.properties = Collections.unmodifiableMap(properties);
+this.schema = schema;
 }
 
 public static MessageImpl deserialize(ByteBuf headersAndPayload) 
throws IOException {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: leak assign schema in MessageImpl constructor (#2439)

2018-08-23 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 024d2e0  leak assign schema in MessageImpl constructor (#2439)
024d2e0 is described below

commit 024d2e0ea3b8e7e3b26f5e658b449a0861f3112d
Author: Jia Zhai 
AuthorDate: Fri Aug 24 11:42:55 2018 +0800

leak assign schema in MessageImpl constructor (#2439)
---
 .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 5ea8bfe..96b68c1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -155,6 +155,7 @@ public class MessageImpl implements Message {
 this.cnx = null;
 this.payload = payload;
 this.properties = Collections.unmodifiableMap(properties);
+this.schema = schema;
 }
 
 public static MessageImpl deserialize(ByteBuf headersAndPayload) 
throws IOException {



[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   Seems it is up to application to decide.  Application can choose its own 
name, and only use “-DLQ” when the application doesn’t specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514391
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   Any better suggest?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
codelipenghui commented on a change in pull request #2400: PIP-22: Dead Letter 
Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514262
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -88,20 +100,32 @@
 private final ServiceConfiguration serviceConfig;
 private DispatchRateLimiter dispatchRateLimiter;
 
+protected volatile int maxRedeliveryCount;
+protected volatile String deadLetterTopic;
+protected RedeliveryTracker redeliveryTracker;
+private volatile ProducerImpl deadLetterTopicProducer;
+
 enum ReadType {
 Normal, Replay
 }
 
-public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor) {
+public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, int maxRedeliveryCount,
+ String deadLetterTopic) {
 this.cursor = cursor;
 this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
 this.topic = topic;
 this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+this.messagesToDeadLetter = new HashSet<>(8);
 
 Review comment:
   Ok!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212513063
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -88,20 +100,32 @@
 private final ServiceConfiguration serviceConfig;
 private DispatchRateLimiter dispatchRateLimiter;
 
+protected volatile int maxRedeliveryCount;
+protected volatile String deadLetterTopic;
+protected RedeliveryTracker redeliveryTracker;
+private volatile ProducerImpl deadLetterTopicProducer;
+
 enum ReadType {
 Normal, Replay
 }
 
-public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor) {
+public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, int maxRedeliveryCount,
+ String deadLetterTopic) {
 this.cursor = cursor;
 this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
 this.topic = topic;
 this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+this.messagesToDeadLetter = new HashSet<>(8);
 
 Review comment:
   +1, @codelipenghui, we may need following the way  of `messagesToReplay`, 
which store ledgerId and entryId as primitive value, and then compose a 
Position from(ledgerId + entryId) when using it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2401: Consumer often discards received messages

2018-08-23 Thread GitBox
massakam commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415634865
 
 
   Sorry, OpenMessaging benchmark tool has not yet supported TLS...
   We are using the tool modified to enable TLS.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai opened a new pull request #2439: leak assign schema in MessageImpl constructor

2018-08-23 Thread GitBox
jiazhai opened a new pull request #2439: leak assign schema in MessageImpl 
constructor
URL: https://github.com/apache/incubator-pulsar/pull/2439
 
 
   
   schema was not assigned in MessageImpl constructor, this change try to fix 
it.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #2437: Race condition function-runtime-manager read old assignments

2018-08-23 Thread GitBox
rdhabalia closed pull request #2437: Race condition function-runtime-manager 
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e481..a6fda6f8b6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public String getSubscription() {
 return subscription;
 }
 
+public String getConsumerName() {
+return this.consumerName;
+}
+
 /**
  * Redelivers the given unacknowledged messages. In Failover mode, the 
request is ignored if the consumer is not
  * active for the given topic. In Shared mode, the consumers messages to 
be redelivered are distributed across all
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 7994ef33d4..b18fd12881 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -53,7 +54,7 @@
 public class MembershipManager implements AutoCloseable, ConsumerEventListener 
{
 
 private final String consumerName;
-private final Consumer consumer;
+private final ConsumerImpl consumer;
 private final WorkerConfig workerConfig;
 private PulsarAdmin pulsarAdminClient;
 private final CompletableFuture firstConsumerEventFuture;
@@ -68,9 +69,9 @@
 @VisibleForTesting
 Map unsignedFunctionDurations = new HashMap<>();
 
-MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+MembershipManager(WorkerService service, PulsarClient client)
 throws PulsarClientException {
-this.workerConfig = workerConfig;
+this.workerConfig = service.getWorkerConfig();
 consumerName = String.format(
 "%s:%s:%d",
 workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@
 // we don't produce any messages into this topic, we only use the 
`failover` subscription
 // to elect an active consumer as the leader worker. The leader worker 
will be responsible
 // for scheduling snapshots for FMT and doing task assignment.
-consumer = client.newConsumer()
+consumer = (ConsumerImpl) client.newConsumer()
 .topic(workerConfig.getClusterCoordinationTopic())
 .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
 .subscriptionType(SubscriptionType.Failover)
 .consumerEventListener(this)
 .property(WORKER_IDENTIFIER, consumerName)
 .subscribe();
+
+isLeader.set(checkLeader(service, consumer.getConsumerName()));
 }
 
 @Override
@@ -282,4 +285,19 @@ private PulsarAdmin getPulsarAdminClient() {
 return this.pulsarAdminClient;
 }
 
+private boolean checkLeader(WorkerService service, String consumerName) {
+try {
+TopicStats stats = service.getBrokerAdmin().topics()
+
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+String activeConsumerName = stats != null
+&& 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+? 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+: null;
+return consumerName != null && 
consumerName.equalsIgnoreCase(activeConsumerName);
+} catch (Exception e) {
+log.warn("Failed to check leader {}", e.getMessage());
+}
+return false;
+}
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766e8d..7fc0cc94d7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/wo

[incubator-pulsar] branch master updated: Race condition function-runtime-manager read old assignments (#2437)

2018-08-23 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 672a167  Race condition function-runtime-manager read old assignments 
(#2437)
672a167 is described below

commit 672a167a31c719dab7623770d39666e8d7e7a0fd
Author: Rajan Dhabalia 
AuthorDate: Thu Aug 23 18:34:44 2018 -0700

Race condition function-runtime-manager read old assignments (#2437)
---
 .../apache/pulsar/client/impl/ConsumerBase.java|  4 
 .../pulsar/functions/worker/MembershipManager.java | 26 ++
 .../pulsar/functions/worker/WorkerService.java |  2 +-
 .../functions/worker/MembershipManagerTest.java| 24 
 4 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e..a6fda6f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public abstract class ConsumerBase extends HandlerState 
implements Consumer consumer;
+private final ConsumerImpl consumer;
 private final WorkerConfig workerConfig;
 private PulsarAdmin pulsarAdminClient;
 private final CompletableFuture firstConsumerEventFuture;
@@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 @VisibleForTesting
 Map unsignedFunctionDurations = new HashMap<>();
 
-MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+MembershipManager(WorkerService service, PulsarClient client)
 throws PulsarClientException {
-this.workerConfig = workerConfig;
+this.workerConfig = service.getWorkerConfig();
 consumerName = String.format(
 "%s:%s:%d",
 workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 // we don't produce any messages into this topic, we only use the 
`failover` subscription
 // to elect an active consumer as the leader worker. The leader worker 
will be responsible
 // for scheduling snapshots for FMT and doing task assignment.
-consumer = client.newConsumer()
+consumer = (ConsumerImpl) client.newConsumer()
 .topic(workerConfig.getClusterCoordinationTopic())
 .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
 .subscriptionType(SubscriptionType.Failover)
 .consumerEventListener(this)
 .property(WORKER_IDENTIFIER, consumerName)
 .subscribe();
+
+isLeader.set(checkLeader(service, consumer.getConsumerName()));
 }
 
 @Override
@@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 return this.pulsarAdminClient;
 }
 
+private boolean checkLeader(WorkerService service, String consumerName) {
+try {
+TopicStats stats = service.getBrokerAdmin().topics()
+
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+String activeConsumerName = stats != null
+&& 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+? 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+: null;
+return consumerName != null && 
consumerName.equalsIgnoreCase(activeConsumerName);
+} catch (Exception e) {
+log.warn("Failed to check leader {}", e.getMessage());
+}
+return false;
+}
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766..7fc0cc9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -137,7 +137,7 @@ public class WorkerService {
 this.connectorsManager = new ConnectorsManager(workerConfig);
 
 //create membership manager
-this.membershipManager = new MembershipManager(this.workerConfig, 
this.client);
+this.membershipManager = new MembershipManager(this, this.client);
 
 // create function runtime manager
 this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsa

[GitHub] jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics

2018-08-23 Thread GitBox
jerrypeng commented on issue #2438: Fix: Function assignment can support large 
number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415621190
 
 
   @rdhabalia it would be nice to keep all the functions related topics in one 
namespace, but i guess its not that big of a deal


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #2436: Fix: failover sub keep sending invalid ack

2018-08-23 Thread GitBox
rdhabalia closed pull request #2436: Fix: failover sub keep sending invalid ack
URL: https://github.com/apache/incubator-pulsar/pull/2436
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0fefe9c182..5240bfe8ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -164,6 +164,8 @@ public void flush() {
 ByteBuf cmd = Commands.newAck(consumer.consumerId, 
lastCumulativeAck.ledgerId, lastCumulativeAck.entryId,
 AckType.Cumulative, null, Collections.emptyMap());
 cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+return;
 }
 
 // Flush all individual acks


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212497198
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -88,20 +100,32 @@
 private final ServiceConfiguration serviceConfig;
 private DispatchRateLimiter dispatchRateLimiter;
 
+protected volatile int maxRedeliveryCount;
+protected volatile String deadLetterTopic;
+protected RedeliveryTracker redeliveryTracker;
+private volatile ProducerImpl deadLetterTopicProducer;
+
 enum ReadType {
 Normal, Replay
 }
 
-public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor) {
+public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, int maxRedeliveryCount,
+ String deadLetterTopic) {
 this.cursor = cursor;
 this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
 this.topic = topic;
 this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+this.messagesToDeadLetter = new HashSet<>(8);
 
 Review comment:
   umm.. this can cause in high gc for broker. Essentially we would like to 
avoid storing objects in heap with relatively some what long life-cycle. 
therefore, in past release we had an effort to clean up storing PositionImp 
objects from code base. so, for broker serving high throughput with low latency 
requirement might want to having this feature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
rdhabalia commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212497283
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   what if tenant already has topic with postfix `DLQ`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics

2018-08-23 Thread GitBox
rdhabalia commented on issue #2438: Fix: Function assignment can support large 
number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415615536
 
 
   > we can also use topic compaction to delete old assignment messages
   
   yes, with that we can address performance issue but it will not address 
scalability issue because we can't publish >5MB assignment-message with 
2500-functions as per above example. This PR has a simple and straight forward 
approach to ack previous message and we can avoid additional hook for 
compaction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2438: Fix: Function assignment can support large number of topics

2018-08-23 Thread GitBox
jerrypeng commented on issue #2438: Fix: Function assignment can support large 
number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-415613914
 
 
   @rdhabalia we can also use topic compaction to delete old assignment messages


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments

2018-08-23 Thread GitBox
jerrypeng commented on issue #2437: Race condition function-runtime-manager 
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415613741
 
 
   @rdhabalia gotcha thanks for explanation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2437: Race condition function-runtime-manager read old assignments

2018-08-23 Thread GitBox
rdhabalia commented on issue #2437: Race condition function-runtime-manager 
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415612533
 
 
   >  In the scheduleManager, we have a check before invoking actual scheduling:
   
   yes, exactly 
[SchedulerManager](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L93)
 tries to schedule only if it is a leader and at that time MemberShipManager 
has not received that notification  of leader so, schedulerManager skips 
scheduling and RuntimeManager reads last available assignment.
   I am able to reproduce it in standalone function worker where 
standalone-scheduler-manager skips scheduling as it finds `isLeader()` false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415611753
 
 
   @rdhabalia Currently /workers doesn't redirect either. /functions might.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2437: Race condition function-runtime-manager read old assignments

2018-08-23 Thread GitBox
jerrypeng commented on issue #2437: Race condition function-runtime-manager 
read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437#issuecomment-415611324
 
 
   @rdhabalia I am a little confused on how the schedule manager can schedule 
something before membership manager receives notification that it is the 
leader.  In the scheduleManager, we have a check before invoking actual 
scheduling:
   
   
https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L93
   
   Am I missing something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
rdhabalia commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415610192
 
 
   > wrt having a seperate endpoint, I'm not sure if its really needs to go in 
/worker-stats?
   
   actually, motivation behind having separate endpoint `workers` and 
`worker-stats` is `worker-stats` will always serve localhost stats and this 
resource can't redirect or serve for other workers. and that was the reason we 
have broker-stats to serve broker's local resources. so, I thought we can keep 
that consistency. @merlimat any thoughts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia opened a new pull request #2438: Fix: Function assignment can support large number of topics

2018-08-23 Thread GitBox
rdhabalia opened a new pull request #2438: Fix: Function assignment can support 
large number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438
 
 
   ### Motivation: 
   Pulsar function assignment has scalability and performance issue.
   
   Right now, 
[SchedulerManager](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java#L154)
 publishes all assignments into one pulsar message. Here, each assignment 
generates 700 bytes of payload. Pulsar has limitation with pulsar messages-size 
which is around 5MB. Now, if each function is running with 3 instances then it 
requires 2KB of payload so, function can only support around 2500 functions in 
the cluster.
   
   Also, assignment event is something that happens more frequent in the system 
which can be triggered on any assignment change or worker restart. So, over 
period of time, we can expect large number of assignment messages stored across 
many ledgers in the system and every time worker restart, it requires to read 
all those very old ledgers from BK which is something we would also definitely 
like to avoid.
   
   **Note:** 
   We can easily reproduce it by registering function with parallelism=12000 
which will fail to publish assignment message. 
   
   ### Modification
   
   1. Publish multiple messages (each message with limited number of 
assignments) to include all assignments to support any number of function 
assignments
   2. Acks the message  for old version of assignments (which requires separate 
namespace for assignment which won't have infinite retention configured)
   3. Broker deletes ledgers for old assignments and assignment-reader doesn't 
have to read such ledgers.
   
   ### Result
   1. Pulsar function can support any number of functions in the system
   2. Assignment-manager doesn't read old assignments so, broker and bookie can 
avoid unnecessary read and dispatching
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2435: improve error message handling

2018-08-23 Thread GitBox
srkukarni commented on issue #2435: improve error message handling
URL: https://github.com/apache/incubator-pulsar/pull/2435#issuecomment-415607320
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase

2018-08-23 Thread GitBox
srkukarni closed pull request #2432: Make all cli arguments follow consistent - 
notation instead of camelcase
URL: https://github.com/apache/incubator-pulsar/pull/2432
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 1019853c9a..f31fd82468 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -277,7 +277,9 @@ void processArguments() throws Exception {
 protected String DEPRECATED_userConfigString;
 @Parameter(names = "--user-config", description = "User-defined config 
key/values")
 protected String userConfigString;
-@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order", hidden = true)
+protected Boolean DEPRECATED_retainOrdering;
+@Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
 protected boolean retainOrdering;
 @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
 protected Integer parallelism;
@@ -329,8 +331,9 @@ private void mergeArgs() {
 if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) 
customSerdeInputString = DEPRECATED_customSerdeInputString;
 
 if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = 
DEPRECATED_fnConfigFile;
-if (DEPRECATED_processingGuarantees != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = 
DEPRECATED_processingGuarantees;
+if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
 if (!StringUtils.isBlank(DEPRECATED_userConfigString)) 
userConfigString = DEPRECATED_userConfigString;
+if (DEPRECATED_retainOrdering != null) retainOrdering = 
DEPRECATED_retainOrdering;
 if (DEPRECATED_windowLengthCount != null) windowLengthCount = 
DEPRECATED_windowLengthCount;
 if (DEPRECATED_windowLengthDurationMs != null) 
windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
 if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount 
= DEPRECATED_slidingIntervalCount;
@@ -825,6 +828,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 CmdFunctions.startLocalRun(convertProto2(functionConfig), 
functionConfig.getParallelism(),
 instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
@@ -1023,6 +1027,7 @@ public void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (triggerFile == null && triggerValue == null) {
 throw new ParameterException("Either a trigger value or a 
trigger filepath needs to be specified");
@@ -1057,6 +1062,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(sourceFile)) {
 throw new ParameterException("--source-file needs to be 
specified");
@@ -1091,6 +1097,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(destinationFile)) {
 throw new ParameterException("--destination-file needs to be 
specified");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 52f30e64d5..5259996ca7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -118,29 +118,56 @@ void processArguments() throws Exception {
 @Parameters(commandDescription = "Run a Pulsar IO sink connector locally 
(rather than deploying it to the Pulsar cluster)")
 protected class LocalSinkRunner extends CreateSink {
 
-@Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
+  

[incubator-pulsar] branch master updated: Make all cli arguments follow consistent - notation instead of camelcase (#2432)

2018-08-23 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 225eeb7  Make all cli arguments follow consistent - notation instead 
of camelcase (#2432)
225eeb7 is described below

commit 225eeb70ed6e2f88783997a891fec642d1b7baac
Author: Sanjeev Kulkarni 
AuthorDate: Thu Aug 23 16:28:21 2018 -0700

Make all cli arguments follow consistent - notation instead of camelcase 
(#2432)
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 11 ++-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 87 ++
 .../org/apache/pulsar/admin/cli/CmdSources.java| 77 +++
 3 files changed, 145 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 1019853..f31fd82 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -277,7 +277,9 @@ public class CmdFunctions extends CmdBase {
 protected String DEPRECATED_userConfigString;
 @Parameter(names = "--user-config", description = "User-defined config 
key/values")
 protected String userConfigString;
-@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order", hidden = true)
+protected Boolean DEPRECATED_retainOrdering;
+@Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
 protected boolean retainOrdering;
 @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
 protected Integer parallelism;
@@ -329,8 +331,9 @@ public class CmdFunctions extends CmdBase {
 if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) 
customSerdeInputString = DEPRECATED_customSerdeInputString;
 
 if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = 
DEPRECATED_fnConfigFile;
-if (DEPRECATED_processingGuarantees != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = 
DEPRECATED_processingGuarantees;
+if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
 if (!StringUtils.isBlank(DEPRECATED_userConfigString)) 
userConfigString = DEPRECATED_userConfigString;
+if (DEPRECATED_retainOrdering != null) retainOrdering = 
DEPRECATED_retainOrdering;
 if (DEPRECATED_windowLengthCount != null) windowLengthCount = 
DEPRECATED_windowLengthCount;
 if (DEPRECATED_windowLengthDurationMs != null) 
windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
 if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount 
= DEPRECATED_slidingIntervalCount;
@@ -825,6 +828,7 @@ public class CmdFunctions extends CmdBase {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 CmdFunctions.startLocalRun(convertProto2(functionConfig), 
functionConfig.getParallelism(),
 instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
@@ -1023,6 +1027,7 @@ public class CmdFunctions extends CmdBase {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (triggerFile == null && triggerValue == null) {
 throw new ParameterException("Either a trigger value or a 
trigger filepath needs to be specified");
@@ -1057,6 +1062,7 @@ public class CmdFunctions extends CmdBase {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(sourceFile)) {
 throw new ParameterException("--source-file needs to be 
specified");
@@ -1091,6 +1097,7 @@ public class CmdFunctions extends CmdBase {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(destinationFile)) {
 throw new ParameterException("--destination-file needs to be 
specified");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 52f30e6..5259996 100644
--- 
a/pulsar-client-tools/src/mai

[GitHub] rdhabalia opened a new pull request #2437: Race condition function-runtime-manager read old assignments

2018-08-23 Thread GitBox
rdhabalia opened a new pull request #2437: Race condition 
function-runtime-manager read old assignments
URL: https://github.com/apache/incubator-pulsar/pull/2437
 
 
   ### Motivation
   
   Right now, In function, schedulerManager tries to schedule assignments 
before MemebershipManager receives leader notification from broker. So, 
leader-worker's schedulerManager failed to assign new assignment and 
RuntimeManager read old assignment message and applies old assignment which 
might not be valid.
   
   ### Modifications
   
   Fix race condition by setting correct leader at `MemebershipManager` before 
function tries to schedule assignments.
   
   ### Result
   
   Function worker will not apply old assignments.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Made Twitter FireHose to emit records containing schema information (#2433)

2018-08-23 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d6a0ce0  Made Twitter FireHose to emit records containing schema 
information (#2433)
d6a0ce0 is described below

commit d6a0ce092fd3f3d09ecfbe81d26b71d7b94b03e2
Author: Sanjeev Kulkarni 
AuthorDate: Thu Aug 23 16:13:10 2018 -0700

Made Twitter FireHose to emit records containing schema information (#2433)
---
 .../org/apache/pulsar/io/twitter/TweetData.java| 121 +
 .../apache/pulsar/io/twitter/TwitterFireHose.java  |  21 ++--
 2 files changed, 134 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
new file mode 100644
index 000..3e5503d
--- /dev/null
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.twitter;
+
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class TweetData {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private RetweetedStatus retweetedStatus;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+private String timestampMs;
+private Delete delete;
+
+@Data
+public static class User {
+private Long id;
+private String idStr;
+private String name;
+private String screenName;
+private String location;
+private String description;
+private String translatorType;
+private Boolean _protected;
+private Boolean verified;
+private Long followersCount;
+private Long friendsCount;
+private Long listedCount;
+private Long favouritesCount;
+private Long statusesCount;
+private String createdAt;
+private Boolean geoEnabled;
+private String lang;
+private Boolean contributorsEnabled;
+private Boolean isTranslator;
+private String profileBackgroundColor;
+private String profileBackgroundImageUrl;
+private String profileBackgroundImageUrlHttps;
+private Boolean profileBackgroundTile;
+private String profileLinkColor;
+private String profileSidebarBorderColor;
+private String profileSidebarFillColor;
+private String profileTextColor;
+private Boolean profileUseBackgroundImage;
+private String profileImageUrl;
+private String profileImageUrlHttps;
+private String profileBannerUrl;
+private Boolean defaultProfile;
+private Boolean defaultProfileImage;
+}
+@Data
+public static class Url {
+private String url;
+private String expandedUrl;
+private String displayUrl;
+private List indices = null;
+}
+@Data
+public static class RetweetedStatus {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+}
+@Data
+public static class Status {
+private Long id;
+private String idStr;
+private Long userId;
+private String userIdStr;
+}
+@Data
+p

[GitHub] srkukarni closed pull request #2433: Made Twitter FireHose to emit records containing schema information

2018-08-23 Thread GitBox
srkukarni closed pull request #2433: Made Twitter FireHose to emit records 
containing schema information
URL: https://github.com/apache/incubator-pulsar/pull/2433
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
new file mode 100644
index 00..3e5503d8cc
--- /dev/null
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.twitter;
+
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class TweetData {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private RetweetedStatus retweetedStatus;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+private String timestampMs;
+private Delete delete;
+
+@Data
+public static class User {
+private Long id;
+private String idStr;
+private String name;
+private String screenName;
+private String location;
+private String description;
+private String translatorType;
+private Boolean _protected;
+private Boolean verified;
+private Long followersCount;
+private Long friendsCount;
+private Long listedCount;
+private Long favouritesCount;
+private Long statusesCount;
+private String createdAt;
+private Boolean geoEnabled;
+private String lang;
+private Boolean contributorsEnabled;
+private Boolean isTranslator;
+private String profileBackgroundColor;
+private String profileBackgroundImageUrl;
+private String profileBackgroundImageUrlHttps;
+private Boolean profileBackgroundTile;
+private String profileLinkColor;
+private String profileSidebarBorderColor;
+private String profileSidebarFillColor;
+private String profileTextColor;
+private Boolean profileUseBackgroundImage;
+private String profileImageUrl;
+private String profileImageUrlHttps;
+private String profileBannerUrl;
+private Boolean defaultProfile;
+private Boolean defaultProfileImage;
+}
+@Data
+public static class Url {
+private String url;
+private String expandedUrl;
+private String displayUrl;
+private List indices = null;
+}
+@Data
+public static class RetweetedStatus {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+}
+@Data
+public static class Status {
+private Long id;
+private String idStr;
+private Long userId;
+private String userIdStr;
+}
+@Data
+public static class Delete {
+private Status status;
+private String timestampMs;
+}
+}
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 76ea4ac250..2631db1796 100644
--- 

[GitHub] rdhabalia opened a new pull request #2436: Fix: failover sub keep sending invalid ack

2018-08-23 Thread GitBox
rdhabalia opened a new pull request #2436: Fix: failover sub keep sending 
invalid ack
URL: https://github.com/apache/incubator-pulsar/pull/2436
 
 
   ### Motivation
   
   Consumer with FailOver-Subscription is not reseting `lastCumulativeAck` so, 
ack-tracker task keeps sending invalid ack to broker.
   
   ### Modifications
   
   - Reset `lastCumulativeAck` and return to avoid sending duplicate ack.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #2435: improve error message handling

2018-08-23 Thread GitBox
jerrypeng opened a new pull request #2435: improve error message handling
URL: https://github.com/apache/incubator-pulsar/pull/2435
 
 
   ### Motivation
   
   Improve error message handling in presto pulsar connector
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase

2018-08-23 Thread GitBox
srkukarni commented on issue #2432: Make all cli arguments follow consistent - 
notation instead of camelcase
URL: https://github.com/apache/incubator-pulsar/pull/2432#issuecomment-415585124
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se opened a new pull request #2434: Add pulsar flink connectors

2018-08-23 Thread GitBox
aahmed-se opened a new pull request #2434: Add pulsar flink connectors
URL: https://github.com/apache/incubator-pulsar/pull/2434
 
 
   We Introduce a module for simple pulsar connectors for flink.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2317: Make twitter source to emit records with schema info

2018-08-23 Thread GitBox
srkukarni closed pull request #2317: Make twitter source to emit records with 
schema info
URL: https://github.com/apache/incubator-pulsar/pull/2317
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
new file mode 100644
index 00..fab936e3a2
--- /dev/null
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class TweetData {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private RetweetedStatus retweetedStatus;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+private String timestampMs;
+
+private Delete delete;
+
+@Data
+public static class User {
+private Long id;
+private String idStr;
+private String name;
+private String screenName;
+private String location;
+private String description;
+private String translatorType;
+private Boolean _protected;
+private Boolean verified;
+private Long followersCount;
+private Long friendsCount;
+private Long listedCount;
+private Long favouritesCount;
+private Long statusesCount;
+private String createdAt;
+private Boolean geoEnabled;
+private String lang;
+private Boolean contributorsEnabled;
+private Boolean isTranslator;
+private String profileBackgroundColor;
+private String profileBackgroundImageUrl;
+private String profileBackgroundImageUrlHttps;
+private Boolean profileBackgroundTile;
+private String profileLinkColor;
+private String profileSidebarBorderColor;
+private String profileSidebarFillColor;
+private String profileTextColor;
+private Boolean profileUseBackgroundImage;
+private String profileImageUrl;
+private String profileImageUrlHttps;
+private String profileBannerUrl;
+private Boolean defaultProfile;
+private Boolean defaultProfileImage;
+}
+
+@Data
+public static class Url {
+private String url;
+private String expandedUrl;
+private String displayUrl;
+private List indices = null;
+}
+
+@Data
+public static class RetweetedStatus {
+private String createdAt;
+private Long id;
+private String idStr;
+private String text;
+private String source;
+private Boolean truncated;
+private User user;
+private Boolean isQuoteStatus;
+private Long quoteCount;
+private Long replyCount;
+private Long retweetCount;
+private Long favoriteCount;
+private Boolean favorited;
+private Boolean retweeted;
+private String filterLevel;
+private String lang;
+}
+
+@Data
+public static class Status {
+private Long id;
+private String idStr;
+private Long userId;
+private String userIdStr;
+}
+
+@Data
+public static class Delete {
+private Status status;
+private String timestampMs;
+}
+}
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 76ea4ac250..eef9077427 100644
--- 
a/pu

[GitHub] srkukarni commented on issue #2317: Make twitter source to emit records with schema info

2018-08-23 Thread GitBox
srkukarni commented on issue #2317: Make twitter source to emit records with 
schema info
URL: https://github.com/apache/incubator-pulsar/pull/2317#issuecomment-415577626
 
 
   closing in favor of https://github.com/apache/incubator-pulsar/pull/2433


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2433: Made Twitter FireHose to emit records containing schema information

2018-08-23 Thread GitBox
srkukarni opened a new pull request #2433: Made Twitter FireHose to emit 
records containing schema information
URL: https://github.com/apache/incubator-pulsar/pull/2433
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455850
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
 ##
 @@ -0,0 +1,35 @@
+package org.apache.pulsar.tests.integration.containers;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455767
 
 

 ##
 File path: 
pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
 ##
 @@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class HdfsSinkConfigTests {
+   
+   @Rule
+public ExpectedException thrown = ExpectedException.none();
+
+   @Test
+   public final void loadFromYamlFileTest() throws IOException {
+   File yamlFile = getFile("sinkConfig.yaml");
 
 Review comment:
   Fixed and added "contrib-check" profile to main pom.xml file to validate 
check style compliance


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455521
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
+   
+   /**
+* A file or comma separated list of files which contains the Hadoop 
file system configuration. Without this, Hadoop
+ * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' 
file or will revert to a default configuration.
+*/
+   protected String hdfsConfigResources;
+   
+   /**
+* The HDFS directory from which files should be read from or written to
+*/
+   protected String directory;
+   
+   /**
+* The character encoding for the files, e.g. UTF-8, ASCII, etc.
+*/
+   protected String encoding;
+   
+   /**
+* The compression codec used to compress/de-compress the files on 
HDFS. 
+*/
+   protected String compression;
+   
+   /**
+* The Kerberos user principal account to use for authentication
+*/
+   protected String kerberosUserPrincipal;
+   
+   /**
+* The full pathname to the Kerberos keytab file to use for 
authentication.
+*/
+   protected String keytab;
+   
+   public void validate() {
+   if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory) )
+   throw new IllegalArgumentException("Required property 
not set.");
+   
+   if (StringUtils.isNotEmpty(compression)) {
+   if (!Stream.of(BZIP2, DEFLATE, GZIP, LZ4, 
SNAPPY).anyMatch(compression::equalsIgnoreCase)) {
+   throw new IllegalArgumentException("Invalid 
Compression code specified. Valid values are 'BZip2', 'Deflate', 'Gzip', 'Lz4', 
or 'Snappy'");
+   }
+   }
+   
+   if ( (StringUtils.isNotEmpty(kerberosUserPrincipal) && 
StringUtils.isEmpty(keytab)) ||
+(StringUtils.isEmpty(kerberosUserPrincipal) && 
StringUtils.isNotEmpty(keytab)) ) {
+   throw new IllegalArgumentException("Values for both 
kerberosUserPrincipal & keytab are required.");
+   }
+   }
+
+public CompressionCodec getCompressionCodec() {
+   if (StringUtils.isBlank(compression)) 
+   return null;
+   
+   if (compression.equalsIgnoreCase(BZIP2))
+   return new BZip2Codec();
+   
+   if (compression.equalsIgnoreCase(DEFLATE))
+   return new DeflateCodec();
+   
+   if (compression.equalsIgnoreCase(GZIP))
+   return new GzipCodec();
+   
+   if (compression.equalsIgnoreCase(LZ4))
+   return new Lz4Codec();
+   
+   if (compression.equalsIgnoreCase(SNAPPY))
+   return new SnappyCodec();
+   
+   return null;
+}
+
+   

[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455382
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455416
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-23 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r212455487
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+   public static final String BZIP2 = "BZip2";
+   public static final String DEFLATE = "Deflate";
+   public static final String GZIP = "Gzip";
+   public static final String LZ4 = "Lz4";
+   public static final String SNAPPY = "Snappy";
+   
+   private static final long serialVersionUID = 1L;
+   
+   /**
+* A file or comma separated list of files which contains the Hadoop 
file system configuration. Without this, Hadoop
+ * will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' 
file or will revert to a default configuration.
+*/
+   protected String hdfsConfigResources;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415564305
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2432: Make all cli arguments follow consistent - notation instead of camelcase

2018-08-23 Thread GitBox
srkukarni commented on issue #2432: Make all cli arguments follow consistent - 
notation instead of camelcase
URL: https://github.com/apache/incubator-pulsar/pull/2432#issuecomment-415564129
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415541657
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2381: Make source and sink cli args format consistent

2018-08-23 Thread GitBox
srkukarni commented on issue #2381: Make source and sink cli args format 
consistent
URL: https://github.com/apache/incubator-pulsar/pull/2381#issuecomment-415539137
 
 
   This is superseded by https://github.com/apache/incubator-pulsar/pull/2432


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2381: Make source and sink cli args format consistent

2018-08-23 Thread GitBox
srkukarni closed pull request #2381: Make source and sink cli args format 
consistent
URL: https://github.com/apache/incubator-pulsar/pull/2381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5f52313d57..1a7d1e96e1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -268,7 +268,10 @@ void processArguments() throws Exception {
 protected String DEPRECATED_userConfigString;
 @Parameter(names = "--user-config", description = "User-defined config 
key/values")
 protected String userConfigString;
-@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+// for backwards compatibility purposes
+@Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order", hidden = true)
+protected Boolean DEPRECATED_retainOrdering;
+@Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
 protected boolean retainOrdering;
 @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
 protected Integer parallelism;
@@ -319,8 +322,9 @@ private void mergeArgs() {
 if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) 
outputSerdeClassName = DEPRECATED_outputSerdeClassName;
 if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) 
customSerdeInputString = DEPRECATED_customSerdeInputString;
 if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = 
DEPRECATED_fnConfigFile;
-if (DEPRECATED_processingGuarantees != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = 
DEPRECATED_processingGuarantees;
+if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
 if (!StringUtils.isBlank(DEPRECATED_userConfigString)) 
userConfigString = DEPRECATED_userConfigString;
+if (DEPRECATED_retainOrdering != null) retainOrdering = 
DEPRECATED_retainOrdering;
 if (DEPRECATED_windowLengthCount != null) windowLengthCount = 
DEPRECATED_windowLengthCount;
 if (DEPRECATED_windowLengthDurationMs != null) 
windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
 if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount 
= DEPRECATED_slidingIntervalCount;
@@ -787,6 +791,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 CmdFunctions.startLocalRun(convertProto2(functionConfig), 
functionConfig.getParallelism(),
 instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
@@ -964,6 +969,7 @@ public void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (triggerFile == null && triggerValue == null) {
 throw new ParameterException("Either a trigger value or a 
trigger filepath needs to be specified");
@@ -998,6 +1004,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(sourceFile)) {
 throw new ParameterException("--source-file needs to be 
specified");
@@ -1032,6 +1039,7 @@ private void mergeArgs() {
 
 @Override
 void runCmd() throws Exception {
+// merge deprecated args with new args
 mergeArgs();
 if (StringUtils.isBlank(destinationFile)) {
 throw new ParameterException("--destination-file needs to be 
specified");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 8417874f38..5ccd9b0853 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -24,23 +24,9 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
-import java.io.File;
-import java.io.FileOutputStream;
-imp

[GitHub] srkukarni opened a new pull request #2432: Make all cli arguments follow consistent - notation instead of camelcase

2018-08-23 Thread GitBox
srkukarni opened a new pull request #2432: Make all cli arguments follow 
consistent - notation instead of camelcase
URL: https://github.com/apache/incubator-pulsar/pull/2432
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration

2018-08-23 Thread GitBox
jerrypeng commented on a change in pull request #2428: Package Presto with 
Pulsar for SQL integration
URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212421574
 
 

 ##
 File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties
 ##
 @@ -0,0 +1,5 @@
+connector.name=pulsar
 
 Review comment:
   good idea!  I was just trying to keep the original presto project structure


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Support heartbeat function for worker (#2424)

2018-08-23 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 31edf5a  Support heartbeat function for worker (#2424)
31edf5a is described below

commit 31edf5aa74f3c4005a228ef463df05ecb2cee287
Author: Rajan Dhabalia 
AuthorDate: Thu Aug 23 11:20:15 2018 -0700

Support heartbeat function for worker (#2424)
---
 .../pulsar/functions/worker/WorkerConfig.java  |  2 +-
 .../worker/scheduler/RoundRobinScheduler.java  | 17 ++-
 .../functions/worker/SchedulerManagerTest.java | 57 ++
 3 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 38ef5d3..0f695a9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -143,7 +143,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
 
 public String getWorkerId() {
 if (StringUtils.isBlank(this.workerId)) {
-this.workerId = getWorkerHostname();
+this.workerId = String.format("%s-%s", this.getWorkerHostname(), 
this.getWorkerPort());
 }
 return this.workerId;
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 4f9ad62..58c1a9a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.scheduler;
 
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
 import java.util.HashMap;
@@ -29,6 +30,9 @@ import java.util.stream.Collectors;
 
 public class RoundRobinScheduler implements IScheduler {
 
+public static final String HEARTBEAT_TENANT = "pulsar-function";
+public static final String HEARTBEAT_NAMESPACE = "heartbeat";
+
 @Override
 public List schedule(List 
unassignedFunctionInstances, List
 currentAssignments, List workers) {
@@ -44,7 +48,8 @@ public class RoundRobinScheduler implements IScheduler {
 }
 
 for (Instance unassignedFunctionInstance : 
unassignedFunctionInstances) {
-String workerId = findNextWorker(workerIdToAssignment);
+String heartBeatWorkerId = 
checkHeartBeatFunction(unassignedFunctionInstance);
+String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : 
findNextWorker(workerIdToAssignment);
 Assignment newAssignment = 
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
 .setWorkerId(workerId).build();
 workerIdToAssignment.get(workerId).add(newAssignment);
@@ -57,6 +62,16 @@ public class RoundRobinScheduler implements IScheduler {
 return assignments;
 }
 
+private static String checkHeartBeatFunction(Instance funInstance) {
+if (funInstance.getFunctionMetaData() != null
+&& funInstance.getFunctionMetaData().getFunctionDetails() != 
null) {
+FunctionDetails funDetails = 
funInstance.getFunctionMetaData().getFunctionDetails();
+return HEARTBEAT_TENANT.equals(funDetails.getTenant())
+&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? 
funDetails.getName() : null;
+}
+return null;
+}
+
 private String findNextWorker(Map> 
workerIdToAssignment) {
 String targetWorkerId = null;
 int least = Integer.MAX_VALUE;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 97e9c36..19977bd 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 import org

[GitHub] rdhabalia closed pull request #2424: Support heartbeat function for worker

2018-08-23 Thread GitBox
rdhabalia closed pull request #2424: Support heartbeat function for worker
URL: https://github.com/apache/incubator-pulsar/pull/2424
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 38ef5d3a6c..0f695a974f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws 
IOException {
 
 public String getWorkerId() {
 if (StringUtils.isBlank(this.workerId)) {
-this.workerId = getWorkerHostname();
+this.workerId = String.format("%s-%s", this.getWorkerHostname(), 
this.getWorkerPort());
 }
 return this.workerId;
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 4f9ad62b30..58c1a9a513 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.scheduler;
 
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
 import java.util.HashMap;
@@ -29,6 +30,9 @@
 
 public class RoundRobinScheduler implements IScheduler {
 
+public static final String HEARTBEAT_TENANT = "pulsar-function";
+public static final String HEARTBEAT_NAMESPACE = "heartbeat";
+
 @Override
 public List schedule(List 
unassignedFunctionInstances, List
 currentAssignments, List workers) {
@@ -44,7 +48,8 @@
 }
 
 for (Instance unassignedFunctionInstance : 
unassignedFunctionInstances) {
-String workerId = findNextWorker(workerIdToAssignment);
+String heartBeatWorkerId = 
checkHeartBeatFunction(unassignedFunctionInstance);
+String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : 
findNextWorker(workerIdToAssignment);
 Assignment newAssignment = 
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
 .setWorkerId(workerId).build();
 workerIdToAssignment.get(workerId).add(newAssignment);
@@ -57,6 +62,16 @@
 return assignments;
 }
 
+private static String checkHeartBeatFunction(Instance funInstance) {
+if (funInstance.getFunctionMetaData() != null
+&& funInstance.getFunctionMetaData().getFunctionDetails() != 
null) {
+FunctionDetails funDetails = 
funInstance.getFunctionMetaData().getFunctionDetails();
+return HEARTBEAT_TENANT.equals(funDetails.getTenant())
+&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? 
funDetails.getName() : null;
+}
+return null;
+}
+
 private String findNextWorker(Map> 
workerIdToAssignment) {
 String targetWorkerId = null;
 int least = Integer.MAX_VALUE;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 97e9c365a5..19977bd812 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -47,6 +47,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
@@ -554,6 +555,62 @@ public void testScalingDown() throws Exception {
 );
 }
 
+@Test
+public void testHeartbeatFunction() throws Exception {
+List functionMetaDataList = new 
LinkedList<>();
+final long version = 5;
+final String workerId1 = "host-workerId-1";
+final String workerId2 = "host-workerId-2";
+Function.FunctionMetaData function1 = 
Fu

[GitHub] rdhabalia commented on a change in pull request #2424: Support heartbeat function for worker

2018-08-23 Thread GitBox
rdhabalia commented on a change in pull request #2424: Support heartbeat 
function for worker
URL: https://github.com/apache/incubator-pulsar/pull/2424#discussion_r212406508
 
 

 ##
 File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 ##
 @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws 
IOException {
 
 public String getWorkerId() {
 if (StringUtils.isBlank(this.workerId)) {
-this.workerId = getWorkerHostname();
+this.workerId = String.format("%s-%s", this.getWorkerHostname(), 
this.getWorkerPort());
 
 Review comment:
   it was a bug actually, if multiple worker instances start on the same host 
but different port then it doesn't create unique workerId and it was resulting 
into incorrect assignment. Therefore, it is part of bug-fix also.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bardock commented on issue #2431: Unable to consume messages, need to re-subscribe client

2018-08-23 Thread GitBox
bardock commented on issue #2431: Unable to consume messages, need to 
re-subscribe client
URL: 
https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415509377
 
 
   Here they are: 
[stats.zip](https://github.com/apache/incubator-pulsar/files/2315609/stats.zip)
   Thanks!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415505950
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2424: Support heartbeat function for worker

2018-08-23 Thread GitBox
srkukarni commented on a change in pull request #2424: Support heartbeat 
function for worker
URL: https://github.com/apache/incubator-pulsar/pull/2424#discussion_r212389886
 
 

 ##
 File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 ##
 @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws 
IOException {
 
 public String getWorkerId() {
 if (StringUtils.isBlank(this.workerId)) {
-this.workerId = getWorkerHostname();
+this.workerId = String.format("%s-%s", this.getWorkerHostname(), 
this.getWorkerPort());
 
 Review comment:
   This change has implications for other places where both workerId and 
workerport are being used. Maybe add a seperate function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2431: Unable to consume messages, need to re-subscribe client

2018-08-23 Thread GitBox
merlimat commented on issue #2431: Unable to consume messages, need to 
re-subscribe client
URL: 
https://github.com/apache/incubator-pulsar/issues/2431#issuecomment-415497842
 
 
   @bardock Can you shared topics stats : `pulsar-admin persistent stats 
$TOPIC` and internal stats  `pulsar-admin persistent stats-internal $TOPIC`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration

2018-08-23 Thread GitBox
sijie commented on a change in pull request #2428: Package Presto with Pulsar 
for SQL integration
URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212386472
 
 

 ##
 File path: bin/pulsar
 ##
 @@ -99,6 +103,24 @@ if [ ! -f "${PY_INSTANCE_FILE}" ]; then
 PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
 fi
 
+# find pulsar sql presto distribution location
+if [ ! -d "${PRESTO_HOME}" ]; then
+
+#  BUILT_PRESTO_HOME="${SQL_HOME}/presto-server/target/presto-server-0.206"
 
 Review comment:
   remove this line


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration

2018-08-23 Thread GitBox
sijie commented on a change in pull request #2428: Package Presto with Pulsar 
for SQL integration
URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212387727
 
 

 ##
 File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties
 ##
 @@ -0,0 +1,5 @@
+connector.name=pulsar
 
 Review comment:
   any reason why we can't place /etc/catalog/pulsar.properties to 
conf/presto/catalog/pulsar.properties? then we have a central place for 
managing all configuration files for pulsar components.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2428: Package Presto with Pulsar for SQL integration

2018-08-23 Thread GitBox
sijie commented on a change in pull request #2428: Package Presto with Pulsar 
for SQL integration
URL: https://github.com/apache/incubator-pulsar/pull/2428#discussion_r212387884
 
 

 ##
 File path: pulsar-sql/presto-server/src/resources/etc/catalog/pulsar.properties
 ##
 @@ -0,0 +1,5 @@
+connector.name=pulsar
 
 Review comment:
   basically `/etc` -> `conf/presto`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bardock opened a new issue #2431: Unable to consume messages, need to re-subscribe client

2018-08-23 Thread GitBox
bardock opened a new issue #2431: Unable to consume messages, need to 
re-subscribe client
URL: https://github.com/apache/incubator-pulsar/issues/2431
 
 
   This is similar to issue 
[2013](https://github.com/apache/incubator-pulsar/issues/2013). We have several 
shared subscriptions of partitioned topics and we found some cases where a 
subscription's client is not receiving messages, but it's not related to a 
specific topic partition. 
   In this case, if we re-subscribe the client (`PulsarClient.subscribeAsync`), 
messages start to be delivered as expected.
   
   Client logs this warn:
   ```
   [log_time:16:17:42.377] [thread:pulsar-timer-6-1] [level:WARN ] 
[logger:UnAckedMessageTracker] - 
[ConsumerBase{subscription='picking-consumer-prod.fbm-wms-picking', 
consumerName='i-08e97d991d75e9d07-10.73.125.156', 
topic='persistent://fury/global/wms-wave-feed-prod.fbm-wms-picking/wms-wave-feed-prod.fbm-wms-picking'}]
 1 messages have timed-out
   ```
   
   And we don't see any error from brokers.
   
   Any idea? Any log or metric that may be useful to debug this problem?
   
    System configuration
   **Pulsar version (brokers)**: 1.20.0
   **Pulsar clients version**: 1.22.0
   **Bookkeeper version**: 4.7.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages

2018-08-23 Thread GitBox
ivankelly commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415495334
 
 
   I have a local repro, will look into it more in the morning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bardock commented on issue #2013: Unable to consume messages from a partition

2018-08-23 Thread GitBox
bardock commented on issue #2013: Unable to consume messages from a partition
URL: 
https://github.com/apache/incubator-pulsar/issues/2013#issuecomment-415487704
 
 
   @sijie any thoughts?
   We are now using version 4.7.1 and we are tracking almost every bookkeeper 
metric.
   What metrics do you think will be useful to debug this issue?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on issue #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#issuecomment-415484255
 
 
   @rdhabalia I've corrected the mistake of taking out functionsmetrics.
   wrt having a seperate endpoint, I'm not sure if its really needs to go in 
/worker-stats? /worker/metrics and /worker/functionsmetrics should do the same 
job? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on a change in pull request #2426: Added cli commands to 
get function cluster related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#discussion_r212377386
 
 

 ##
 File path: 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 ##
 @@ -37,7 +36,7 @@ private Resources() {
 return new HashSet<>(
 Arrays.asList(
 FunctionApiV2Resource.class,
-WorkerStats.class,
+WorkerApiV2Resource.class,
 
 Review comment:
   /worker is strictly v2. I see no point in adding a v1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2426: Added cli commands to get function cluster related information

2018-08-23 Thread GitBox
srkukarni commented on a change in pull request #2426: Added cli commands to 
get function cluster related information
URL: https://github.com/apache/incubator-pulsar/pull/2426#discussion_r212377268
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 ##
 @@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.admin.v2;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-@Path("/worker-stats")
-public class WorkerStats extends FunctionApiResource {
-
-@GET
-@Path("/functions")
 
 Review comment:
   An oversight on my part while moving. I've added it but moved to 
/worker/functionsmetrics to better reflect what the call does


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2401: Consumer often discards received messages

2018-08-23 Thread GitBox
massakam commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415382210
 
 
   @ivankelly
   > Do you have a reliable repro, or do you just see it happening regularly in 
production?
   
   In our environment, it can be reproduced by using the [OpenMessaging 
benchmark tool](https://github.com/openmessaging/openmessaging-benchmark) and 
applying the following load:
   ```
   topics: 1
   partitionsPerTopic: 1
   messageSize: 1024
   payloadFile: "payload/payload-1Kb.data"
   subscriptionsPerTopic: 100
   producersPerTopic: 1
   producerRate: 1000
   consumerBacklogSizeGB: 0
   testDurationMinutes: 2
   ```
   It seems that it tends to occur when there are many subscriptions. It also 
occurred several times in production.
   
   > Is TLS auth enabled, or just TLS transport?
   
   We are not using TLS auth.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages

2018-08-23 Thread GitBox
ivankelly commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415368815
 
 
   What rate are messages arriving at the topic in question?
   Is the client processing messages for just this topic, or other topics also?
   
   I'm trying to get a feel for how we could repro this locally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #2401: Consumer often discards received messages

2018-08-23 Thread GitBox
ivankelly commented on issue #2401: Consumer often discards received messages
URL: 
https://github.com/apache/incubator-pulsar/issues/2401#issuecomment-415368282
 
 
   Looks like a bytebuf issue. When reading a bytebuf, we often do things like 
reading the size from the packet and setting the writeIndex to the current 
writeIndex + size, so that it can be passed into the protobuf parser. This is 
why the write index seems so messed up. Do you have a reliable repro, or do you 
just see it happening regularly in production? 
   
   Is TLS auth enabled, or just TLS transport? this is going to be a bytebuf 
pooling issue of some sort with TLs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format

2018-08-23 Thread GitBox
sijie commented on issue #2430: Namespaces admin commands should be able to 
distinguish v1 and v2 namespace format
URL: 
https://github.com/apache/incubator-pulsar/issues/2430#issuecomment-415353663
 
 
   the problem is namespace admin commands don't distinguish v1 and v2 format. 
so set-backlog-quotas is setting the quota in a wrong namespace.  since the v2 
topic name is limited to (tenant)/(namespace)/(topic), we should be able to 
distinguish v1 and v2 namespace name in admin commands. if it has only 2 
components `(tenant)/(topic)`, it is a v2 namespace. we can simply disallow v1 
namespace name in namespace admin commands.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new issue #2430: Namespaces admin commands should be able to distinguish v1 and v2 namespace format

2018-08-23 Thread GitBox
sijie opened a new issue #2430: Namespaces admin commands should be able to 
distinguish v1 and v2 namespace format
URL: https://github.com/apache/incubator-pulsar/issues/2430
 
 
    Expected behavior
   
   - setup a 2.0.0 cluster
   - enable backlog : set-backlog-quotas test-tenant/test-cluster/ns1 --limit 
500G --policy producer_exception
   - run perf: pulsar-perf produce 
persistent://test-tenant/test-cluster/ns1/test-topic
   - pulsar perf should be able to produce 500G data
   
    Actual behavior
   
   - pulsar perf encounters backlog exception when producing around ~10GB data
   
    Steps to reproduce
   
   as described in "expected behavior"
   
    System configuration
   **Pulsar version**: 2.0.0
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2429: delete inactive topics should not delete topics that have retention set

2018-08-23 Thread GitBox
sijie closed issue #2429: delete inactive topics should not delete topics that 
have retention set
URL: https://github.com/apache/incubator-pulsar/issues/2429
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2429: delete inactive topics should not delete topics that have retention set

2018-08-23 Thread GitBox
sijie commented on issue #2429: delete inactive topics should not delete topics 
that have retention set
URL: 
https://github.com/apache/incubator-pulsar/issues/2429#issuecomment-415351488
 
 
   sorry I think this is a false alarm. closing the issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services