[GitHub] yush1ga commented on issue #1352: Delete inactive subscriptions automatically

2018-04-09 Thread GitBox
yush1ga commented on issue #1352: Delete inactive subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-379975158
 
 
   @merlimat @nkurihar 
   I added `lastActive` to `ManagedCursor`.
   We can detect the last time that consumers connect or disconnect in the 
subscription and it is used for deleting inactive subscription.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1529: Fix delete topic check and provide better error message.

2018-04-09 Thread GitBox
merlimat closed pull request #1529: Fix delete topic check and provide better 
error message.
URL: https://github.com/apache/incubator-pulsar/pull/1529
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 03a435d9c5..863272d5c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -470,8 +470,10 @@ protected void internalDeleteTopic(boolean authoritative) {
 // v2 topics have a global name so check if the topic is replicated.
 if (topic.isReplicated()) {
 // Delete is disallowed on global topic
-log.error("[{}] Delete topic is forbidden on global namespace {}", 
clientAppId(), topicName);
-throw new RestException(Status.FORBIDDEN, "Delete forbidden on 
global namespace");
+final List clusters = topic.getReplicators().keys();
+log.error("[{}] Delete forbidden topic {} is replicated on 
clusters {}",
+clientAppId(), topicName, clusters);
+throw new RestException(Status.FORBIDDEN, "Delete forbidden topic 
is replicated on clusters " + clusters);
 }
 
 try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2d63fb6d11..c34ec84ba8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1507,7 +1507,7 @@ public boolean isEncryptionRequired() {
 
 @Override
 public boolean isReplicated() {
-return replicators.size() > 1;
+return !replicators.isEmpty();
 }
 
 public CompletableFuture terminate() {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fix delete topic check and provide better error message. (#1529)

2018-04-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new dc9a9dc  Fix delete topic check and provide better error message. 
(#1529)
dc9a9dc is described below

commit dc9a9dc808dcea35d5570ad5c6d8d22183ce0a7d
Author: cckellogg 
AuthorDate: Mon Apr 9 19:23:32 2018 -0700

Fix delete topic check and provide better error message. (#1529)

* Fix delete topic check and provide better error message.

* Update error message.

* Update log to match exception.
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java   | 6 --
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 03a435d..863272d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -470,8 +470,10 @@ public class PersistentTopicsBase extends AdminResource {
 // v2 topics have a global name so check if the topic is replicated.
 if (topic.isReplicated()) {
 // Delete is disallowed on global topic
-log.error("[{}] Delete topic is forbidden on global namespace {}", 
clientAppId(), topicName);
-throw new RestException(Status.FORBIDDEN, "Delete forbidden on 
global namespace");
+final List clusters = topic.getReplicators().keys();
+log.error("[{}] Delete forbidden topic {} is replicated on 
clusters {}",
+clientAppId(), topicName, clusters);
+throw new RestException(Status.FORBIDDEN, "Delete forbidden topic 
is replicated on clusters " + clusters);
 }
 
 try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2d63fb6..c34ec84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1507,7 +1507,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
 
 @Override
 public boolean isReplicated() {
-return replicators.size() > 1;
+return !replicators.isEmpty();
 }
 
 public CompletableFuture terminate() {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.


[GitHub] srkukarni commented on issue #1534: Implement Cassandra Sink

2018-04-09 Thread GitBox
srkukarni commented on issue #1534: Implement Cassandra Sink
URL: https://github.com/apache/incubator-pulsar/pull/1534#issuecomment-379947340
 
 
   @sijie @merlimat 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #1534: Implement Cassandra Sink

2018-04-09 Thread GitBox
srkukarni opened a new pull request #1534: Implement Cassandra Sink
URL: https://github.com/apache/incubator-pulsar/pull/1534
 
 
   ### Motivation
   
   Using the Pulsar Connect Sink interface, implemented a simple cassandra sink
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
jerrypeng commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379946725
 
 
   I am ok with users specifying functions via fqfn but currently 
FunctionConfig already has tenant, namespace, and name fields.  Adding another 
field fqfn would be redundant and create multiple sources of truth which can 
easily result in bugs later on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379943562
 
 
   Currently `FunctionConfig` is used  for 1) stored as part of function 
metadata, and 2) loading yaml config 2. That means this question has two parts:
   
   1) do we need FQFN for function metadata
   2) do we want user to be able to configure FQFN in yaml file?
   
   I think it is clear that we don't want to store FQFN in as part of function 
metadata (which I think that is the original purpose of this change).
   
   for the second question 2), I think FQFN might be convenient on command 
line, but not very sure about yaml file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic

2018-04-09 Thread GitBox
zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle 
lookup for partitioned topic
URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379942086
 
 
   @merlimat , Thanks.  From the test logs, ownership acquire seems not take to 
much time, but loadNamespaceTopics takes some time. 
   The linked list is a concurrent link, seems work-able in test, maybe there 
is some race condition that not considered. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #1533: Refactor JavaInstanceRunnable

2018-04-09 Thread GitBox
sijie opened a new pull request #1533: Refactor JavaInstanceRunnable
URL: https://github.com/apache/incubator-pulsar/pull/1533
 
 
   *Motivation*
   
   EffectivelyOnce processing made `JavaInstanceRunnable` very complicated. 
There are tons of braching logic. It makes code hard to maintain.
   
   *Solution*
   
   Abstract the processing guarantee related logic into a `MessageProcessor` 
interface. Implement `at-most-once`, `at-least-once` and `effectively-once` 
processors.
   
   *Result*
   
   After this change, `JavaInstanceRunnable` is much cleaner. `At-Least-Once` 
and `At-Most-Once` logic are much simpler and easier to debug.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic

2018-04-09 Thread GitBox
zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle 
lookup for partitioned topic
URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379942086
 
 
   @merlimat , Thanks.  From the test logs, ownership acquire seems not take to 
much time, but loadNamespaceTopics takes some time. 
   The linked list is a concurrent link, seems work-able in test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1526: make connection timeout config-able for consumer and producer

2018-04-09 Thread GitBox
zhaijack commented on issue #1526: make connection timeout config-able for 
consumer and producer
URL: https://github.com/apache/incubator-pulsar/pull/1526#issuecomment-379941198
 
 
   Thanks @merlimat , make a mistake here. will close this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack closed pull request #1526: make connection timeout config-able for consumer and producer

2018-04-09 Thread GitBox
zhaijack closed pull request #1526: make connection timeout config-able for 
consumer and producer
URL: https://github.com/apache/incubator-pulsar/pull/1526
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a446dd4567..ccdfaf78fd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -184,7 +184,9 @@
 }
 
 this.connectionHandler = new ConnectionHandler(this,
-new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, 
TimeUnit.MILLISECONDS),
+new Backoff(100, TimeUnit.MILLISECONDS,
+client.getConfiguration().getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS,
+0, TimeUnit.MILLISECONDS),
 this);
 
 grabCnx();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 92a512de2b..901a465a6e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -181,7 +181,9 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
 }
 
 this.connectionHandler = new ConnectionHandler(this,
-new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 
Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
+new Backoff(100, TimeUnit.MILLISECONDS,
+client.getConfiguration().getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS,
+Math.max(100, conf.getSendTimeoutMs() - 100), 
TimeUnit.MILLISECONDS),
 this);
 grabCnx();
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview

2018-04-09 Thread GitBox
sijie commented on issue #1503: Pulsar Functions triggering overview
URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379939993
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
srkukarni commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379933269
 
 
   In that case, I'm now questioning the viability of having this feature. 
Having tenant/namespace/name AND fqn clearly pollutes/increases the surface 
area unnecessarily imo. @jerrypeng @sijie do you guys have thoughts on this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Pulsar Connect (#1520)

2018-04-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 027b424  Pulsar Connect (#1520)
027b424 is described below

commit 027b424fd914db306bc3a9223ca2b36030421ce8
Author: Sanjeev Kulkarni 
AuthorDate: Mon Apr 9 17:08:06 2018 -0700

Pulsar Connect (#1520)

* Added Pulsar Connect interfaces that define connectors that push data 
into pulsar and take data from pulsar

* Added Twitter connector

* Added hbc core version mapping

* Addressed comments

* Fixed build

* Fixed license header
---
 pom.xml|   3 +
 pulsar-connect/core/pom.xml|  33 
 .../org/apache/pulsar/connect/core/PushSource.java |  52 +++
 .../java/org/apache/pulsar/connect/core/Sink.java  |  49 ++
 pulsar-connect/pom.xml |  39 +
 pulsar-connect/twitter/pom.xml |  61 
 .../pulsar/connect/twitter/TwitterFireHose.java| 167 +
 .../connect/twitter/TwitterFireHoseConfig.java |  63 
 8 files changed, 467 insertions(+)

diff --git a/pom.xml b/pom.xml
index 30b3d5e..2ea3f52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@ flexible messaging model and an intuitive client 
API.
 pulsar-log4j2-appender
 
 pulsar-functions
+
+pulsar-connect
   
 
   
@@ -139,6 +141,7 @@ flexible messaging model and an intuitive client 
API.
 2.8.2
 0.8.3
 2.1.1
+2.2.0
 
 
 3.4.0
diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml
new file mode 100644
index 000..dea6d62
--- /dev/null
+++ b/pulsar-connect/core/pom.xml
@@ -0,0 +1,33 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-core
+  Pulsar Connect :: Connect
+
+
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
new file mode 100644
index 000..65b006b
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Pulsar's Push Source interface. PushSource read data from
+ * external sources(database changes, twitter firehose, etc)
+ * and publish to a Pulsar topic. The reason its called Push is
+ * because PushSources get passed a consumption Function that they
+ * invoke whenever they have data to be published to Pulsar.
+ * The lifcycle of a PushSource is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * A consumer Function is then to it which is invoked by the source whenever
+ * there is data to be published. Once all data has been read, one can use 
close
+ * at the end of the session to do any cleanup
+ */
+public interface PushSource extends AutoCloseable {
+/**
+ * Open connector with configuration
+ *
+ * @param config initialization config
+ * @throws Exception IO type exceptions when opening a connector
+ */
+void open(final Map config) throws Exception;
+
+/**
+ * Attach a consumer function to this Source. This is invoked by the 
implementation
+ * to pass messages whenever there is data to be pushed to Pulsar.
+ * @param consumer
+ */
+void setConsumer(Function consumer);
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 

[GitHub] merlimat closed pull request #1520: Pulsar Connect

2018-04-09 Thread GitBox
merlimat closed pull request #1520: Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 30b3d5e5e2..2ea3f5235c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@ flexible messaging model and an intuitive client 
API.
 pulsar-log4j2-appender
 
 pulsar-functions
+
+pulsar-connect
   
 
   
@@ -139,6 +141,7 @@ flexible messaging model and an intuitive client 
API.
 2.8.2
 0.8.3
 2.1.1
+2.2.0
 
 
 3.4.0
diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml
new file mode 100644
index 00..dea6d626cc
--- /dev/null
+++ b/pulsar-connect/core/pom.xml
@@ -0,0 +1,33 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-core
+  Pulsar Connect :: Connect
+
+
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
new file mode 100644
index 00..65b006bf6f
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Pulsar's Push Source interface. PushSource read data from
+ * external sources(database changes, twitter firehose, etc)
+ * and publish to a Pulsar topic. The reason its called Push is
+ * because PushSources get passed a consumption Function that they
+ * invoke whenever they have data to be published to Pulsar.
+ * The lifcycle of a PushSource is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * A consumer Function is then to it which is invoked by the source whenever
+ * there is data to be published. Once all data has been read, one can use 
close
+ * at the end of the session to do any cleanup
+ */
+public interface PushSource extends AutoCloseable {
+/**
+ * Open connector with configuration
+ *
+ * @param config initialization config
+ * @throws Exception IO type exceptions when opening a connector
+ */
+void open(final Map config) throws Exception;
+
+/**
+ * Attach a consumer function to this Source. This is invoked by the 
implementation
+ * to pass messages whenever there is data to be pushed to Pulsar.
+ * @param consumer
+ */
+void setConsumer(Function consumer);
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
new file mode 100644
index 00..e22eb0f20b
--- /dev/null
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for 

[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
lucperkins commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379930880
 
 
   @srkukarni I feel like FQFN should be settable either via the command line 
*and* via YAML or neither. Inconsistent interfaces will likely sow a lot of 
unnecessary confusion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
srkukarni commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379929218
 
 
   @lucperkins My feeling is that this is should be a command line helper 
rather than something in the fqn. Thus I would vote it to be removed from 
protobuf, even if that means that in the config file, we need to specify 
tenant/namespace/name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
lucperkins commented on a change in pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180263273
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -117,9 +115,31 @@ void processArguments() throws Exception {}
  * Function level command
  */
 @Getter
-abstract class FunctionCommand extends NamespaceCommand {
-@Parameter(names = "--name", description = "The function's name", 
required = true)
+abstract class FunctionCommand extends BaseCommand {
+@Parameter(names = "--fqfn", description = "The Fully Qualified 
Function Name (FQFN) for the function")
+protected String fqfn;
+
+@Parameter(names = "--tenant", description = "The function's tenant")
+protected String tenant;
+
+@Parameter(names = "--namespace", description = "The function's 
namespace")
+protected String namespace;
+
+@Parameter(names = "--name", description = "The function's name")
 protected String functionName;
+
+@Override
+void processArguments() throws Exception {
 
 Review comment:
   Okay, never mind, I understand what you mean now. Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
lucperkins commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379927094
 
 
   @jerrypeng As far as I can tell this *is* necessary for users to be able to 
specify FQFN in a function's YAML config, like so:
   
   ```yaml
   jar: my-jar.jar
   className: org.example.functions.MyFunction
   inputs:
   - persistent://sample/standalone/ns1/in
   output: persistent://sample/standalone/ns1/out
   fqfn: sample/ns1/my-func
   ```
   
   At the moment, the `FunctionConfig` class that's generated from that 
Protobuf file powers the whole YAML config system. I'm not sure if that's 
optimal but for now it's the system that's in place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
lucperkins commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379927094
 
 
   @jerrypeng As far as I can tell this *is* necessary for users to be able to 
specify FQFN in a function's YAML config, like so:
   
   ```yaml
   jar: my-jar.jar
   className: org.example.functions.MyFunction
   inputs:
   - persistent://sample/standalone/ns1/in
   output: persistent://sample/standalone/ns1/out
   fqfn: sample/ns1/my-func
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379919691
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1530: remove unused proto declaration

2018-04-09 Thread GitBox
sijie closed pull request #1530: remove unused proto declaration
URL: https://github.com/apache/incubator-pulsar/pull/1530
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 542d847c4d..ab4a726432 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -69,11 +69,6 @@ message FunctionMetaData {
 uint64 createTime = 4;
 }
 
-message Snapshot {
-repeated FunctionMetaData functionMetaDataList = 1;
-bytes lastAppliedMessageId = 2;
-}
-
 message Instance {
 FunctionMetaData functionMetaData = 1;
 int32 instanceId = 2;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: remove unused proto declaration (#1530)

2018-04-09 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 99bd812  remove unused proto declaration (#1530)
99bd812 is described below

commit 99bd812e000510a86fd39fca396fc058fc27f673
Author: Boyang Jerry Peng 
AuthorDate: Mon Apr 9 15:58:15 2018 -0700

remove unused proto declaration (#1530)
---
 pulsar-functions/proto/src/main/proto/Function.proto | 5 -
 1 file changed, 5 deletions(-)

diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 542d847..ab4a726 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -69,11 +69,6 @@ message FunctionMetaData {
 uint64 createTime = 4;
 }
 
-message Snapshot {
-repeated FunctionMetaData functionMetaDataList = 1;
-bytes lastAppliedMessageId = 2;
-}
-
 message Instance {
 FunctionMetaData functionMetaData = 1;
 int32 instanceId = 2;

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.


[GitHub] sijie commented on issue #1532: Support JSON in schema registry

2018-04-09 Thread GitBox
sijie commented on issue #1532: Support JSON in schema registry
URL: 
https://github.com/apache/incubator-pulsar/issues/1532#issuecomment-379918852
 
 
   /cc @mgodave 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new issue #1532: Support JSON in schema registry

2018-04-09 Thread GitBox
sijie opened a new issue #1532: Support JSON in schema registry
URL: https://github.com/apache/incubator-pulsar/issues/1532
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
lucperkins commented on a change in pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180254078
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -117,9 +115,31 @@ void processArguments() throws Exception {}
  * Function level command
  */
 @Getter
-abstract class FunctionCommand extends NamespaceCommand {
-@Parameter(names = "--name", description = "The function's name", 
required = true)
+abstract class FunctionCommand extends BaseCommand {
+@Parameter(names = "--fqfn", description = "The Fully Qualified 
Function Name (FQFN) for the function")
+protected String fqfn;
+
+@Parameter(names = "--tenant", description = "The function's tenant")
+protected String tenant;
+
+@Parameter(names = "--namespace", description = "The function's 
namespace")
+protected String namespace;
+
+@Parameter(names = "--name", description = "The function's name")
 protected String functionName;
+
+@Override
+void processArguments() throws Exception {
 
 Review comment:
   I agree but in this case `FunctionCommand`s, `NamespaceCommand`s, and 
`FunctionConfigCommand`s really do require separate logic. I'm not sure I can 
see a way around that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
lucperkins commented on a change in pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180251671
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -117,9 +115,31 @@ void processArguments() throws Exception {}
  * Function level command
  */
 @Getter
-abstract class FunctionCommand extends NamespaceCommand {
-@Parameter(names = "--name", description = "The function's name", 
required = true)
+abstract class FunctionCommand extends BaseCommand {
+@Parameter(names = "--fqfn", description = "The Fully Qualified 
Function Name (FQFN) for the function")
+protected String fqfn;
+
+@Parameter(names = "--tenant", description = "The function's tenant")
+protected String tenant;
+
+@Parameter(names = "--namespace", description = "The function's 
namespace")
+protected String namespace;
+
+@Parameter(names = "--name", description = "The function's name")
 protected String functionName;
+
+@Override
+void processArguments() throws Exception {
+if (null != fqfn) {
+String[] fqfnArray = fqfn.split("/");
+if (fqfnArray.length != 3) {
+throw new IllegalArgumentException("Fully qualified 
function names (FQFNs) must be of the form tenant/namespace/name");
+}
+tenant = fqfnArray[0];
 
 Review comment:
   Well, in this case you can *either* set a FQFN *or* tenant + namespace + 
name. So I think that I'll change this to make sure users don't attempt to 
apply both systems.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1520: Pulsar Connect

2018-04-09 Thread GitBox
srkukarni commented on issue #1520: Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1520#issuecomment-379910681
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview

2018-04-09 Thread GitBox
sijie commented on issue #1503: Pulsar Functions triggering overview
URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379903856
 
 
   @lucperkins can you merge the latest master? this PR has conflicts with 
master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
sijie commented on a change in pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180238543
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -117,9 +115,31 @@ void processArguments() throws Exception {}
  * Function level command
  */
 @Getter
-abstract class FunctionCommand extends NamespaceCommand {
-@Parameter(names = "--name", description = "The function's name", 
required = true)
+abstract class FunctionCommand extends BaseCommand {
+@Parameter(names = "--fqfn", description = "The Fully Qualified 
Function Name (FQFN) for the function")
+protected String fqfn;
+
+@Parameter(names = "--tenant", description = "The function's tenant")
+protected String tenant;
+
+@Parameter(names = "--namespace", description = "The function's 
namespace")
+protected String namespace;
+
+@Parameter(names = "--name", description = "The function's name")
 protected String functionName;
+
+@Override
+void processArguments() throws Exception {
 
 Review comment:
   nit: it is good to call super.processArguments() in general. just in case 
people modified the logic in BaseCommand in future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
sijie commented on a change in pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180239050
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -117,9 +115,31 @@ void processArguments() throws Exception {}
  * Function level command
  */
 @Getter
-abstract class FunctionCommand extends NamespaceCommand {
-@Parameter(names = "--name", description = "The function's name", 
required = true)
+abstract class FunctionCommand extends BaseCommand {
+@Parameter(names = "--fqfn", description = "The Fully Qualified 
Function Name (FQFN) for the function")
+protected String fqfn;
+
+@Parameter(names = "--tenant", description = "The function's tenant")
+protected String tenant;
+
+@Parameter(names = "--namespace", description = "The function's 
namespace")
+protected String namespace;
+
+@Parameter(names = "--name", description = "The function's name")
 protected String functionName;
+
+@Override
+void processArguments() throws Exception {
+if (null != fqfn) {
+String[] fqfnArray = fqfn.split("/");
+if (fqfnArray.length != 3) {
+throw new IllegalArgumentException("Fully qualified 
function names (FQFNs) must be of the form tenant/namespace/name");
+}
+tenant = fqfnArray[0];
 
 Review comment:
   better to throw exception if tenant/namespace/functions are inconsistent.
   
   e.g. `-tenant t1 -fqfn t2/namespace/name`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1529: Fix delete topic check and provide better error message.

2018-04-09 Thread GitBox
sijie commented on a change in pull request #1529: Fix delete topic check and 
provide better error message.
URL: https://github.com/apache/incubator-pulsar/pull/1529#discussion_r180237264
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 ##
 @@ -471,7 +471,8 @@ protected void internalDeleteTopic(boolean authoritative) {
 if (topic.isReplicated()) {
 // Delete is disallowed on global topic
 log.error("[{}] Delete topic is forbidden on global namespace {}", 
clientAppId(), topicName);
-throw new RestException(Status.FORBIDDEN, "Delete forbidden on 
global namespace");
+throw new RestException(Status.FORBIDDEN, "Delete forbidden topic 
is replicated on clusters " +
 
 Review comment:
   nit: it might be good to keep the error message consistent between exception 
and the logging statement above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1531: remove unnecessary proto def in FunctionsConfig

2018-04-09 Thread GitBox
jerrypeng opened a new pull request #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng opened a new pull request #1530: remove unused proto declaration

2018-04-09 Thread GitBox
jerrypeng opened a new pull request #1530: remove unused proto declaration
URL: https://github.com/apache/incubator-pulsar/pull/1530
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1519: Avoid contention in ManagedCursorImpl generated by locking on pendingMarkDeleteOps

2018-04-09 Thread GitBox
merlimat closed pull request #1519: Avoid contention in ManagedCursorImpl 
generated by locking on pendingMarkDeleteOps
URL: https://github.com/apache/incubator-pulsar/pull/1519
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 3ee7703994..05de7fea05 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -43,12 +43,12 @@
   pulsar-common
   ${project.version}
 
-
+
 
   com.google.guava
   guava
 
-
+
 
   org.apache.zookeeper
   zookeeper
@@ -78,6 +78,11 @@
   slf4j-api
 
 
+
+  org.jctools
+  jctools-core
+
+
 
   org.mockito
   mockito-core
@@ -107,7 +112,7 @@
   
 
   
- 
+
   
 
   protobuf
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 462428e1c2..443a873ca7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -38,7 +38,6 @@
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
 
-import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -82,6 +81,7 @@
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.commons.lang3.tuple.Pair;
+import org.jctools.queues.MpmcArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +148,8 @@ public MarkDeleteEntry(PositionImpl newPosition, 
Map properties,
 }
 }
 
-private final ArrayDeque pendingMarkDeleteOps = new 
ArrayDeque<>();
+private final MpmcArrayQueue pendingMarkDeleteOps = new 
MpmcArrayQueue<>(16);
+
 private static final AtomicIntegerFieldUpdater 
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
 AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, 
"pendingMarkDeletedSubmittedCount");
 @SuppressWarnings("unused")
@@ -758,14 +759,12 @@ protected void internalResetCursor(PositionImpl position, 
AsyncCallbacks.ResetCu
 
 log.info("[{}] Initiate reset position to {} on cursor {}", 
ledger.getName(), position, name);
 
-synchronized (pendingMarkDeleteOps) {
-if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, 
TRUE)) {
-log.error("[{}] reset requested - position [{}], previous 
reset in progress - cursor {}",
-ledger.getName(), position, name);
-resetCursorCallback.resetFailed(
-new 
ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in 
progress"),
-position);
-}
+if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, 
TRUE)) {
+log.error("[{}] reset requested - position [{}], previous reset in 
progress - cursor {}", ledger.getName(),
+position, name);
+resetCursorCallback.resetFailed(
+new 
ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in 
progress"),
+position);
 }
 
 final AsyncCallbacks.ResetCursorCallback callback = 
resetCursorCallback;
@@ -805,24 +804,20 @@ public void operationComplete() {
 } finally {
 lock.writeLock().unlock();
 }
-synchronized (pendingMarkDeleteOps) {
-pendingMarkDeleteOps.clear();
-if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
-log.error("[{}] expected reset position [{}], but 
another reset in progress on cursor {}",
-ledger.getName(), newPosition, name);
-}
+
+pendingMarkDeleteOps.drain(entry -> 
entry.callback.markDeleteComplete(entry.ctx));
+if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
+log.error("[{}] expected reset position [{}], but another 
reset in progress on cursor {}",
+ledger.getName(), newPosition, name);
 }
 callback.resetComplete(newPosition);
-
 }
 
 @Override
 public void operationFailed(ManagedLedgerException exception) {
-synchronized 

[GitHub] cckellogg opened a new pull request #1529: Fix delete topic check and provide better error message.

2018-04-09 Thread GitBox
cckellogg opened a new pull request #1529: Fix delete topic check and provide 
better error message.
URL: https://github.com/apache/incubator-pulsar/pull/1529
 
 
   Fix topic replication check and provide a better error message for delete 
topic.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli)

2018-04-09 Thread GitBox
sijie commented on issue #1059: Issue 1014: Rename "global zookeeper" to 
"configuration-store"(change in code, conf and cli)
URL: https://github.com/apache/incubator-pulsar/pull/1059#issuecomment-379873489
 
 
   @merlimat can we move forward with this change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1137: Schema registry

2018-04-09 Thread GitBox
sijie closed pull request #1137: Schema registry
URL: https://github.com/apache/incubator-pulsar/pull/1137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1137: Schema registry

2018-04-09 Thread GitBox
sijie commented on issue #1137: Schema registry
URL: https://github.com/apache/incubator-pulsar/pull/1137#issuecomment-379872308
 
 
   awesome work! @mgodave 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1522: Reduce the contention on DispatcherSingleActiveConsumer by using a co…

2018-04-09 Thread GitBox
sijie commented on issue #1522: Reduce the contention on 
DispatcherSingleActiveConsumer by using a co…
URL: https://github.com/apache/incubator-pulsar/pull/1522#issuecomment-379871315
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1520: Pulsar Connect

2018-04-09 Thread GitBox
sijie commented on issue #1520: Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1520#issuecomment-379871011
 
 
   @merlimat can you review this again? @srkukarni addressed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Removed contention between producers on ManagedLedger addEntry (#1521)

2018-04-09 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new ad90ac6  Removed contention between producers on ManagedLedger 
addEntry (#1521)
ad90ac6 is described below

commit ad90ac6463681f7358f3f331a8440dbcfcd34258
Author: Matteo Merli 
AuthorDate: Mon Apr 9 12:43:01 2018 -0700

Removed contention between producers on ManagedLedger addEntry (#1521)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++
 .../org/apache/bookkeeper/mledger/impl/OpReadEntry.java   |  2 +-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c96b35e..f6c7e3f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -479,7 +479,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 @Override
-public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback 
callback, Object ctx) {
+public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, 
Object ctx) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
 }
@@ -498,6 +498,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
 pendingAddEntries.add(addOperation);
 
+// Jump to specific thread to avoid contention from writers writing 
from different threads
+executor.executeOrdered(name, safeRun(() -> {
+internalAsyncAddEntry(addOperation);
+}));
+}
+
+private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
 if (state == State.ClosingLedger || state == State.CreatingLedger) {
 // We don't have a ready ledger to write into
 // We are waiting for a new ledger to be created
@@ -509,7 +516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 if (now < lastLedgerCreationFailureTimestamp + 
WaitTimeAfterLedgerCreationFailureMs) {
 // Deny the write request, since we haven't waited enough time 
since last attempt to create a new ledger
 pendingAddEntries.remove(addOperation);
-callback.addFailed(new ManagedLedgerException("Waiting for new 
ledger creation to complete"), ctx);
+addOperation.failed(new ManagedLedgerException("Waiting for 
new ledger creation to complete"));
 return;
 }
 
@@ -521,7 +528,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
 mbean.startDataLedgerCreateOp();
 bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, ctx,
+config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, null,
 Collections.emptyMap());
 }
 } else {
@@ -531,7 +538,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 addOperation.setLedger(currentLedger);
 
 ++currentLedgerEntries;
-currentLedgerSize += buffer.readableBytes();
+currentLedgerSize += addOperation.data.readableBytes();
 
 if (log.isDebugEnabled()) {
 log.debug("[{}] Write into current ledger lh={} entries={}", 
name, currentLedger.getId(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 6b3a03a..0dfa338 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback {
 // The reading was already completed, release resources and 
trigger callback
 cursor.readOperationCompleted();
 
-cursor.ledger.getExecutor().execute(safeRun(() -> {
+
cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() 
-> {
 callback.readEntriesComplete(entries, ctx);
 recycle();
 }));


[GitHub] sijie closed pull request #1521: Removed contention between producers on ManagedLedger addEntry

2018-04-09 Thread GitBox
sijie closed pull request #1521: Removed contention between producers on 
ManagedLedger addEntry
URL: https://github.com/apache/incubator-pulsar/pull/1521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c96b35e665..f6c7e3ffd9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -479,7 +479,7 @@ public void asyncAddEntry(final byte[] data, int offset, 
int length, final AddEn
 }
 
 @Override
-public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback 
callback, Object ctx) {
+public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, 
Object ctx) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
 }
@@ -498,6 +498,13 @@ public synchronized void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback
 OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
 pendingAddEntries.add(addOperation);
 
+// Jump to specific thread to avoid contention from writers writing 
from different threads
+executor.executeOrdered(name, safeRun(() -> {
+internalAsyncAddEntry(addOperation);
+}));
+}
+
+private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
 if (state == State.ClosingLedger || state == State.CreatingLedger) {
 // We don't have a ready ledger to write into
 // We are waiting for a new ledger to be created
@@ -509,7 +516,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback
 if (now < lastLedgerCreationFailureTimestamp + 
WaitTimeAfterLedgerCreationFailureMs) {
 // Deny the write request, since we haven't waited enough time 
since last attempt to create a new ledger
 pendingAddEntries.remove(addOperation);
-callback.addFailed(new ManagedLedgerException("Waiting for new 
ledger creation to complete"), ctx);
+addOperation.failed(new ManagedLedgerException("Waiting for 
new ledger creation to complete"));
 return;
 }
 
@@ -521,7 +528,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback
 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
 mbean.startDataLedgerCreateOp();
 bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, ctx,
+config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, null,
 Collections.emptyMap());
 }
 } else {
@@ -531,7 +538,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback
 addOperation.setLedger(currentLedger);
 
 ++currentLedgerEntries;
-currentLedgerSize += buffer.readableBytes();
+currentLedgerSize += addOperation.data.readableBytes();
 
 if (log.isDebugEnabled()) {
 log.debug("[{}] Write into current ledger lh={} entries={}", 
name, currentLedger.getId(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 6b3a03adbb..0dfa33844e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -139,7 +139,7 @@ void checkReadCompletion() {
 // The reading was already completed, release resources and 
trigger callback
 cursor.readOperationCompleted();
 
-cursor.ledger.getExecutor().execute(safeRun(() -> {
+
cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() 
-> {
 callback.readEntriesComplete(entries, ctx);
 recycle();
 }));


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview

2018-04-09 Thread GitBox
sijie commented on issue #1503: Pulsar Functions triggering overview
URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379870448
 
 
   @srkukarni can you review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1525: Compaction considers messages with empty payload as deleting the key

2018-04-09 Thread GitBox
sijie closed pull request #1525: Compaction considers messages with empty 
payload as deleting the key
URL: https://github.com/apache/incubator-pulsar/pull/1525
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4e628bc5f8..9ee31acf42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -113,7 +113,8 @@ public static boolean isBatch(RawMessage msg) {
 messagesRetained++;
 
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
   
singleMessagePayload, batchBuffer);
-} else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) {
+} else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)
+   && singleMessagePayload.readableBytes() > 0) {
 messagesRetained++;
 
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
   
singleMessagePayload, batchBuffer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index fbad47ea30..2eaa8d0c9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -38,6 +38,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.Commands;
@@ -122,9 +123,9 @@ private void phaseOneLoop(RawReader reader,
  id, ioe);
 }
 } else {
-String key = extractKey(m);
-if (key != null) {
-latestForKey.put(key, id);
+Pair keyAndSize = 
extractKeyAndSize(m);
+if (keyAndSize != null) {
+latestForKey.put(keyAndSize.getLeft(), id);
 }
 }
 
@@ -214,10 +215,11 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map
 messageToAdd = Optional.of(m);
 }
 } else {
-String key = extractKey(m);
-if (key == null) { // pass through messages without a 
key
+Pair keyAndSize = extractKeyAndSize(m);
+if (keyAndSize == null) { // pass through messages 
without a key
 messageToAdd = Optional.of(m);
-} else if (latestForKey.get(key).equals(id)) {
+} else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
+   && keyAndSize.getRight() > 0) {
 messageToAdd = Optional.of(m);
 } else {
 m.close();
@@ -307,11 +309,11 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map
 return bkf;
 }
 
-private static String extractKey(RawMessage m) {
+private static Pair extractKeyAndSize(RawMessage m) {
 ByteBuf headersAndPayload = m.getHeadersAndPayload();
 MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
 if (msgMetadata.hasPartitionKey()) {
-return msgMetadata.getPartitionKey();
+return Pair.of(msgMetadata.getPartitionKey(), 
headersAndPayload.readableBytes());
 } else {
 return null;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index a0f0f972e4..22e74f21a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -512,4 

[incubator-pulsar] branch master updated: Compaction considers messages with empty payload as deleting the key (#1525)

2018-04-09 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 951d4d3  Compaction considers messages with empty payload as deleting 
the key (#1525)
951d4d3 is described below

commit 951d4d3805ad0b7365aa83bfb5001c6513ddff68
Author: Ivan Kelly 
AuthorDate: Mon Apr 9 21:39:25 2018 +0200

Compaction considers messages with empty payload as deleting the key (#1525)

If the latest message with a key has an empty payload, compaction
will take this to mean that the key has been deleted, so it will not
be stored in the compacted topic ledger.

This patch also introduces empty messages, which were not previously
possible.
---
 .../pulsar/client/impl/RawBatchConverter.java  |  3 +-
 .../pulsar/compaction/TwoPhaseCompactor.java   | 18 +++---
 .../apache/pulsar/compaction/CompactionTest.java   | 68 ++
 .../pulsar/client/impl/MessageBuilderImpl.java |  4 +-
 4 files changed, 82 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4e628bc..9ee31ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -113,7 +113,8 @@ public class RawBatchConverter {
 messagesRetained++;
 
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
   
singleMessagePayload, batchBuffer);
-} else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) {
+} else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)
+   && singleMessagePayload.readableBytes() > 0) {
 messagesRetained++;
 
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
   
singleMessagePayload, batchBuffer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index fbad47e..2eaa8d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.Commands;
@@ -122,9 +123,9 @@ public class TwoPhaseCompactor extends Compactor {
  id, ioe);
 }
 } else {
-String key = extractKey(m);
-if (key != null) {
-latestForKey.put(key, id);
+Pair keyAndSize = 
extractKeyAndSize(m);
+if (keyAndSize != null) {
+latestForKey.put(keyAndSize.getLeft(), id);
 }
 }
 
@@ -214,10 +215,11 @@ public class TwoPhaseCompactor extends Compactor {
 messageToAdd = Optional.of(m);
 }
 } else {
-String key = extractKey(m);
-if (key == null) { // pass through messages without a 
key
+Pair keyAndSize = extractKeyAndSize(m);
+if (keyAndSize == null) { // pass through messages 
without a key
 messageToAdd = Optional.of(m);
-} else if (latestForKey.get(key).equals(id)) {
+} else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
+   && keyAndSize.getRight() > 0) {
 messageToAdd = Optional.of(m);
 } else {
 m.close();
@@ -307,11 +309,11 @@ public class TwoPhaseCompactor extends Compactor {
 return bkf;
 }
 
-private static String extractKey(RawMessage m) {
+private static Pair extractKeyAndSize(RawMessage m) {
 ByteBuf headersAndPayload = m.getHeadersAndPayload();
 

[GitHub] ivankelly commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true

2018-04-09 Thread GitBox
ivankelly commented on issue #1517: Reader#hasMessageAvailable can report false 
when it should be true
URL: 
https://github.com/apache/incubator-pulsar/issues/1517#issuecomment-379854893
 
 
   The fix is actually pretty trivial. ManagedLedger always opens the last 
ledger before creating a new one, so we can just move the initialization of 
lastConfirmedEntry to the open callback.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)

2018-04-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new da1a110  Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)
da1a110 is described below

commit da1a11043c9e4591c6c5c1d8e6434ccc59b913b0
Author: Matteo Merli 
AuthorDate: Mon Apr 9 10:19:00 2018 -0700

Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)

* Use signSafeMod in RoundRobinPartitionMessageRouter

* Added test with mocked clock

* Fixed tests

* Fixed functions test
---
 .../client/impl/PartitionedProducerImpl.java   |  2 +-
 .../impl/RoundRobinPartitionMessageRouterImpl.java | 27 ---
 .../impl/SinglePartitionMessageRouterImpl.java |  4 +-
 .../org/apache/pulsar/client/util/MathUtils.java   | 41 ++
 .../RoundRobinPartitionMessageRouterImplTest.java  | 91 ++
 .../functions/instance/FunctionResultRouter.java   |  9 ++-
 .../instance/FunctionResultRouterTest.java | 31 
 7 files changed, 158 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 26f74cf..96a8254 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -168,7 +168,7 @@ public class PartitionedProducerImpl extends 
ProducerBase {
 
 int partition = routerPolicy.choosePartition(message, topicMetadata);
 checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
-"Illegal partition index chosen by the message routing 
policy");
+"Illegal partition index chosen by the message routing policy: 
" + partition);
 return producers.get(partition).sendAsync(message);
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 87cfd1a..ee8b9a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.time.Clock;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.client.api.HashingScheme;
@@ -46,35 +48,42 @@ public class RoundRobinPartitionMessageRouterImpl extends 
MessageRouterBase {
 private final boolean isBatchingEnabled;
 private final long maxBatchingDelayMs;
 
-@VisibleForTesting
+private final Clock clock;
+
+private static final Clock SYSTEM_CLOCK = Clock.systemUTC();
+
 public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
-int startPtnIdx) {
-this(hashingScheme, startPtnIdx, false, 0);
+int startPtnIdx,
+boolean isBatchingEnabled,
+long maxBatchingDelayMs) {
+this(hashingScheme, startPtnIdx, isBatchingEnabled, 
maxBatchingDelayMs, SYSTEM_CLOCK);
 }
 
 public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
 int startPtnIdx,
 boolean isBatchingEnabled,
-long maxBatchingDelayMs) {
+long maxBatchingDelayMs,
+Clock clock) {
 super(hashingScheme);
 PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
 this.startPtnIdx = startPtnIdx;
 this.isBatchingEnabled = isBatchingEnabled;
 this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+this.clock = clock;
 }
 
 @Override
 public int choosePartition(Message msg, TopicMetadata topicMetadata) {
 // If the message has a key, it supersedes the round robin routing 
policy
 if (msg.hasKey()) {
-return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
+return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
 }
 
 if (isBatchingEnabled) { // if batching is enabled, choose partition 
on `maxBatchingDelayMs` 

[GitHub] merlimat closed pull request #1523: Use signSafeMod in RoundRobinPartitionMessageRouter

2018-04-09 Thread GitBox
merlimat closed pull request #1523: Use signSafeMod in 
RoundRobinPartitionMessageRouter
URL: https://github.com/apache/incubator-pulsar/pull/1523
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 26f74cf24d..96a82543d5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -168,7 +168,7 @@ public MessageId send(Message message) throws 
PulsarClientException {
 
 int partition = routerPolicy.choosePartition(message, topicMetadata);
 checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
-"Illegal partition index chosen by the message routing 
policy");
+"Illegal partition index chosen by the message routing policy: 
" + partition);
 return producers.get(partition).sendAsync(message);
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 87cfd1a436..ee8b9a0428 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.time.Clock;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.client.api.HashingScheme;
@@ -46,35 +48,42 @@
 private final boolean isBatchingEnabled;
 private final long maxBatchingDelayMs;
 
-@VisibleForTesting
+private final Clock clock;
+
+private static final Clock SYSTEM_CLOCK = Clock.systemUTC();
+
 public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
-int startPtnIdx) {
-this(hashingScheme, startPtnIdx, false, 0);
+int startPtnIdx,
+boolean isBatchingEnabled,
+long maxBatchingDelayMs) {
+this(hashingScheme, startPtnIdx, isBatchingEnabled, 
maxBatchingDelayMs, SYSTEM_CLOCK);
 }
 
 public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
 int startPtnIdx,
 boolean isBatchingEnabled,
-long maxBatchingDelayMs) {
+long maxBatchingDelayMs,
+Clock clock) {
 super(hashingScheme);
 PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
 this.startPtnIdx = startPtnIdx;
 this.isBatchingEnabled = isBatchingEnabled;
 this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+this.clock = clock;
 }
 
 @Override
 public int choosePartition(Message msg, TopicMetadata topicMetadata) {
 // If the message has a key, it supersedes the round robin routing 
policy
 if (msg.hasKey()) {
-return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
+return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
 }
 
 if (isBatchingEnabled) { // if batching is enabled, choose partition 
on `maxBatchingDelayMs` boundary.
-long currentMs = System.currentTimeMillis();
-return (((int) (currentMs / maxBatchingDelayMs)) + startPtnIdx) % 
topicMetadata.numPartitions();
+long currentMs = clock.millis();
+return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, 
topicMetadata.numPartitions());
 } else {
-return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & 
Integer.MAX_VALUE) % topicMetadata.numPartitions());
+return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
topicMetadata.numPartitions());
 }
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index 9cef8b55ae..aacbe4cda4 100644
--- 

[GitHub] lucperkins opened a new pull request #1528: Fix for FQFN flag

2018-04-09 Thread GitBox
lucperkins opened a new pull request #1528: Fix for FQFN flag
URL: https://github.com/apache/incubator-pulsar/pull/1528
 
 
   I realized when trying to create a function that the `--fqfn` flag was added 
to the wrong command base. It doesn't make sense to make it available for 
namespace-level commands. Only function-level commands should have it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic

2018-04-09 Thread GitBox
merlimat commented on issue #1527: Reduce un-necessary work when doing bundle 
lookup for partitioned topic
URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379817560
 
 
   @zhaijack I need to look well into this change, though I think it looks good.
   
   I believe since adding to the ownership map and the linked list are not 
atomic there still might be some operations done multiple times. 
   
   One other option could be to convert the `ownedBundlesCache` in 
`OwnershipCache` to use a `ConcurrentHashMap`. 
   
   That's the same trick we use to load topic, ensuring only the first attempt 
triggers the load and subsequent attempts will just piggyback on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1526: make connection timeout config-able for consumer and producer

2018-04-09 Thread GitBox
merlimat commented on issue #1526: make connection timeout config-able for 
consumer and producer
URL: https://github.com/apache/incubator-pulsar/pull/1526#issuecomment-379815567
 
 
   @zhaijack That's not exactly a "timeout" though. That would be the maximum 
backoff time. The `Backoff` class has the exponential time increase logic. It 
starts at 100ms and keep doubling the retry time between attempts until 
reaching 1min. After that it keeps trying to reconnect every 1min.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true

2018-04-09 Thread GitBox
zhaijack commented on issue #1517: Reader#hasMessageAvailable can report false 
when it should be true
URL: 
https://github.com/apache/incubator-pulsar/issues/1517#issuecomment-379806318
 
 
   Thanks @ivankelly , seems not considered this situation before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack opened a new pull request #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic

2018-04-09 Thread GitBox
zhaijack opened a new pull request #1527: Reduce un-necessary work when doing 
bundle lookup for partitioned topic
URL: https://github.com/apache/incubator-pulsar/pull/1527
 
 
   ### Motivation

   For partitioned topic, when create producer or consumer for each partition 
concurrently, it will call `pularclient.getConnection` and will finally call 
into `namespace.searchForCandidateBroker` to acquire target bundle. If this 
bundle contains more then one partitioned topic, then each of them will call 
the bundle related handling(acquire ownership, loadNamespaceTopics), this waste 
some resource.
   This change tries to let the first partition topic do the real work once, 
and do the piggy back
   for other concurrent topics handling.
   
   e.g.
   ```
Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 
01:43:25.301 [pulsar-1-17] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership 
of benchmark/ local/ns-CeV_hcU/0x1400_0x1800
   .
   .
Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 
01:43:25.301 [pulsar-1-10] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership 
of benchmark/ local/ns-CeV_hcU/0x1400_0x1800
Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 
01:43:25.301 [pulsar-1-15] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership 
of benchmark/ local/ns-CeV_hcU/0x1400_0x1800
Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 
01:43:25.304 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache -Successfully 
acquired ownership of 
/namespace/benchmark/local/ns-CeV_hcU/0x1400_0x1800
Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 
01:43:25.304 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache -Successfully 
acquired ownership of 
/namespace/benchmark/local/ns-CeV_hcU/0x1400_0x1800
   ..
   ``` 
   
   ### Modifications
   
   This change tries to let the first partition topic do the real work only 
once, and do the piggy back
   for other concurrent topics handling.
   
   ### Result
   
   getConnection for partitioned consumer and producer would be more efficient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1137: Schema registry

2018-04-09 Thread GitBox
mgodave commented on issue #1137: Schema registry
URL: https://github.com/apache/incubator-pulsar/pull/1137#issuecomment-379797837
 
 
   All changes are merged, this tracking ticket can be closed or merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitH b] zhaijack opened a new p ll req est #1526: make connection timeo t config-able for cons mer and prod cer

2018-04-09 Thread GitBox
zhaijack opened a new pull request #1526: make connection timeout config-able 
for consumer and producer
URL: https://github.com/apache/incubator-pulsar/pull/1526
 
 
   ### Motivation
   
   In ConsumerImpl and ProducerImpl,  the max timeout for connection is hard 
code as 60 seconds.
   This change try to make it config-able, and use value of 
client.configuration.getOperationTimeoutMs
   
   ### Modifications
   
   change hard code 60s into client.configuration.getOperationTimeoutMs
   
   ### Result
   
   connection timeout could be config-able.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitH b] ivankelly commented on iss e #1513: Managed ledger ses ReadHandle in read path

2018-04-09 Thread GitBox
ivankelly commented on issue #1513: Managed ledger uses ReadHandle in read path
URL: https://github.com/apache/incubator-pulsar/pull/1513#issuecomment-379789804
 
 
   retest this please // BasicEndToEndTest.testStatsLatencies


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitH b] maskit commented on iss e #1266: Pass all Apache Podling Website Checks

2018-04-09 Thread GitBox
maskit commented on issue #1266: Pass all Apache Podling Website Checks
URL: 
https://github.com/apache/incubator-pulsar/issues/1266#issuecomment-379776346
 
 
   I got an email on another project, and it encourages to add ApacheCon 
promotion to our project site.
   http://apache.org/events/README.txt
   
   I think changing the URL for *our* current-event-page to an absolute URL is 
not a good idea because it doesn't work on a local copy, but putting another 
link for ApacheCon as an absolute URL would be a good idea to make the check 
result green.
   
   The logo will be updated automatically and will show the next event, so we 
can put the logo on our site permanently (no need to put, remove, put, remove, 
...).
   
   Where should we put the logo? While I think it should be above the fold, I 
don't want to break current cool design.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitH b] ivankelly commented on iss e #1525: Compaction considers messages with empty payload as deleting the key

2018-04-09 Thread GitBox
ivankelly commented on issue #1525: Compaction considers messages with empty 
payload as deleting the key
URL: https://github.com/apache/incubator-pulsar/pull/1525#issuecomment-379761575
 
 
   retest this please // 3 flakes
   // BrokerServiceLookupTest.testModularLoadManagerSplitBundle
   // BrokerClientIntegrationTest.testResetCursor
   // PatternTopicsConsumerImplTest.testAutoUnbubscribePatternConsumer


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on a change in pull request #1513: Managed ledger uses ReadHandle in read path

2018-04-09 Thread GitBox
ivankelly commented on a change in pull request #1513: Managed ledger uses 
ReadHandle in read path
URL: https://github.com/apache/incubator-pulsar/pull/1513#discussion_r180074490
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
 ##
 @@ -239,43 +243,44 @@ public void asyncReadEntry(LedgerHandle lh, long 
firstEntry, long lastEntry, boo
 }
 
 // Read all the entries from bookkeeper
-lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) 
-> {
-
-if (rc != BKException.Code.OK) {
-if (rc == BKException.Code.TooManyRequestsException) {
-
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
-} else {
-ml.invalidateLedgerHandle(lh1, rc);
-ManagedLedgerException mlException = 
createManagedLedgerException(rc);
-callback.readEntriesFailed(mlException, ctx);
-}
-return;
-}
-
-checkNotNull(ml.getName());
-checkNotNull(ml.getExecutor());
-ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
-// We got the entries, we need to transform them to a 
List<> type
-long totalSize = 0;
-final List entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-while (sequence.hasMoreElements()) {
-// Insert the entries at the end of the list (they 
will be unsorted for now)
-LedgerEntry ledgerEntry = sequence.nextElement();
-EntryImpl entry = EntryImpl.create(ledgerEntry);
-ledgerEntry.getEntryBuffer().release();
-
-entriesToReturn.add(entry);
-
-totalSize += entry.getLength();
-
-}
-
-
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-ml.getMBean().addReadEntriesSample(entriesToReturn.size(), 
totalSize);
-
-callback.readEntriesComplete((List) entriesToReturn, ctx);
-}));
-}, callback);
+lh.readAsync(firstEntry, lastEntry).whenComplete(
+(ledgerEntries, exception) -> {
+if (exception != null) {
+if (exception instanceof BKException
+&& ((BKException)exception).getCode() == 
BKException.Code.TooManyRequestsException) {
+
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+} else {
+ml.invalidateLedgerHandle(lh, exception);
+ManagedLedgerException mlException = 
createManagedLedgerException(exception);
+callback.readEntriesFailed(mlException, ctx);
+}
+return;
+}
+
+checkNotNull(ml.getName());
+checkNotNull(ml.getExecutor());
+ml.getExecutor().executeOrdered(ml.getName(), 
safeRun(() -> {
 
 Review comment:
   Didn't occur to me when I was first doing it, but done now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch asf-site updated: Updated site at revision 8ad606b

2018-04-09 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new d3a6fac  Updated site at revision 8ad606b
d3a6fac is described below

commit d3a6facc7fc84f5c8a05035ddfb82039ed8ae6f7
Author: jenkins 
AuthorDate: Mon Apr 9 08:22:22 2018 +

Updated site at revision 8ad606b
---
 content/docs/latest/adaptors/PulsarSpark/index.html| 2 +-
 content/docs/latest/adaptors/PulsarStorm/index.html| 2 +-
 content/docs/latest/admin-api/overview/index.html  | 8 
 content/docs/latest/admin/Authz/index.html | 4 ++--
 content/docs/latest/clients/Java/index.html| 2 +-
 content/docs/latest/cookbooks/PartitionedTopics/index.html | 2 +-
 content/docs/latest/cookbooks/RetentionExpiry/index.html   | 2 +-
 content/docs/latest/cookbooks/message-deduplication/index.html | 2 +-
 content/docs/latest/deployment/aws-cluster/index.html  | 2 +-
 content/docs/latest/deployment/cluster/index.html  | 2 +-
 content/docs/latest/deployment/instance/index.html | 2 +-
 content/docs/latest/reference/CliTools/index.html  | 4 ++--
 12 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/content/docs/latest/adaptors/PulsarSpark/index.html 
b/content/docs/latest/adaptors/PulsarSpark/index.html
index 91d64a2..05093a7 100644
--- a/content/docs/latest/adaptors/PulsarSpark/index.html
+++ b/content/docs/latest/adaptors/PulsarSpark/index.html
@@ -1421,9 +1421,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/adaptors/PulsarStorm/index.html 
b/content/docs/latest/adaptors/PulsarStorm/index.html
index ef328f8..bdcde4e 100644
--- a/content/docs/latest/adaptors/PulsarStorm/index.html
+++ b/content/docs/latest/adaptors/PulsarStorm/index.html
@@ -1211,9 +1211,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/admin-api/overview/index.html 
b/content/docs/latest/admin-api/overview/index.html
index 3cf7922..b5f413b 100644
--- a/content/docs/latest/admin-api/overview/index.html
+++ b/content/docs/latest/admin-api/overview/index.html
@@ -1009,9 +1009,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
@@ -1221,9 +1221,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
@@ -1431,9 +1431,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
@@ -1649,9 +1649,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/admin/Authz/index.html 
b/content/docs/latest/admin/Authz/index.html
index 7879561..bd75e25 100644
--- a/content/docs/latest/admin/Authz/index.html
+++ b/content/docs/latest/admin/Authz/index.html
@@ -1009,9 +1009,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
@@ -2067,9 +2067,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/clients/Java/index.html 
b/content/docs/latest/clients/Java/index.html
index 4ccd584..edb9c89 100644
--- a/content/docs/latest/clients/Java/index.html
+++ b/content/docs/latest/clients/Java/index.html
@@ -1217,9 +1217,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/cookbooks/PartitionedTopics/index.html 
b/content/docs/latest/cookbooks/PartitionedTopics/index.html
index f26ae01..39301b5 100644
--- a/content/docs/latest/cookbooks/PartitionedTopics/index.html
+++ b/content/docs/latest/cookbooks/PartitionedTopics/index.html
@@ -1429,9 +1429,9 @@
   
   
   
-  The Pulsar admin 
interface
   
   
+  The Pulsar admin 
interface
   
   
   
diff --git a/content/docs/latest/cookbooks/RetentionExpiry/index.html