[incubator-pulsar] branch master updated: Added Kafka Source and Kafka Sink to Pulsar Connect (#1557)

2018-04-12 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 616e2a2  Added Kafka Source and Kafka Sink to Pulsar Connect (#1557)
616e2a2 is described below

commit 616e2a278f11161aa91689cf20eecef9c572a3b9
Author: Sanjeev Kulkarni 
AuthorDate: Thu Apr 12 21:28:40 2018 -0700

Added Kafka Source and Kafka Sink to Pulsar Connect (#1557)

* Added Kafka Source and Kafka Sink to Pulsar Connect

* Standardize on kafka versions for compat and connect
---
 pom.xml|   1 +
 .../pulsar-client-kafka/pom.xml|   6 +-
 pulsar-connect/kafka/pom.xml   |  67 +++
 .../org/apache/pulsar/connect/kafka/KafkaSink.java |  93 +++
 .../pulsar/connect/kafka/KafkaSinkConfig.java  |  59 ++
 .../apache/pulsar/connect/kafka/KafkaSource.java   | 126 +
 .../pulsar/connect/kafka/KafkaSourceConfig.java|  61 ++
 pulsar-connect/pom.xml |   1 +
 8 files changed, 409 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index c67b742..26dced5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.
 2.2.0
 3.4.0
 4.1.5
+0.10.2.1
 5.1.1
 
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 978058d..5c3ee5d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -37,10 +37,6 @@
   Drop-in replacement for Kafka client library that publishes and 
consumes
   messages on Pulsar topics
 
-  
-0.10.2.1
-  
-
   
 
   ${project.groupId}
@@ -51,7 +47,7 @@
 
   org.apache.kafka
   kafka-clients
-  ${kafka.version}
+  ${kafka-client.version}
   
 
   net.jpountz.lz4
diff --git a/pulsar-connect/kafka/pom.xml b/pulsar-connect/kafka/pom.xml
new file mode 100644
index 000..cb08890
--- /dev/null
+++ b/pulsar-connect/kafka/pom.xml
@@ -0,0 +1,67 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-kafka
+  Pulsar Connect :: Kafka
+
+  
+
+
+  ${project.groupId}
+  pulsar-connect-core
+  ${project.version}
+
+
+
+  ${project.groupId}
+  pulsar-common
+  ${project.version}
+
+
+
+  com.fasterxml.jackson.core
+  jackson-databind
+  ${jackson.version}
+
+
+
+  com.fasterxml.jackson.dataformat
+  jackson-dataformat-yaml
+  ${jackson.version}
+
+
+
+  org.apache.kafka
+  kafka-clients
+  ${kafka-client.version}
+
+
+  
+
+
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
new file mode 100644
index 000..1f24309
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * 

[GitHub] merlimat closed pull request #1557: Added Kafka Source and Kafka Sink to Pulsar Connect

2018-04-12 Thread GitBox
merlimat closed pull request #1557: Added Kafka Source and Kafka Sink to Pulsar 
Connect
URL: https://github.com/apache/incubator-pulsar/pull/1557
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index c67b742d62..26dced5550 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.
 2.2.0
 3.4.0
 4.1.5
+0.10.2.1
 5.1.1
 
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 978058ddfd..5c3ee5db54 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -37,10 +37,6 @@
   Drop-in replacement for Kafka client library that publishes and 
consumes
   messages on Pulsar topics
 
-  
-0.10.2.1
-  
-
   
 
   ${project.groupId}
@@ -51,7 +47,7 @@
 
   org.apache.kafka
   kafka-clients
-  ${kafka.version}
+  ${kafka-client.version}
   
 
   net.jpountz.lz4
diff --git a/pulsar-connect/kafka/pom.xml b/pulsar-connect/kafka/pom.xml
new file mode 100644
index 00..cb088901ab
--- /dev/null
+++ b/pulsar-connect/kafka/pom.xml
@@ -0,0 +1,67 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-kafka
+  Pulsar Connect :: Kafka
+
+  
+
+
+  ${project.groupId}
+  pulsar-connect-core
+  ${project.version}
+
+
+
+  ${project.groupId}
+  pulsar-common
+  ${project.version}
+
+
+
+  com.fasterxml.jackson.core
+  jackson-databind
+  ${jackson.version}
+
+
+
+  com.fasterxml.jackson.dataformat
+  jackson-dataformat-yaml
+  ${jackson.version}
+
+
+
+  org.apache.kafka
+  kafka-clients
+  ${kafka-client.version}
+
+
+  
+
+
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
new file mode 100644
index 00..1f243099a8
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Simple Kafka Sink to publish messages to a Kafka topic
+ */
+public class KafkaSink implements Sink> {
+
+private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
+private Producer producer;
+private Properties props = new Properties();
+private KafkaSinkConfig kafkaSinkConfig;
+
+@Override
+public CompletableFuture write(KeyValue message) {
+ProducerRecord record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), 
message.getValue());
+LOG.debug("Message sending to kafka, record={}.", record);
+Future f = producer.send(record);
+return CompletableFuture.supplyAsync(() -> {
+try {
+f.get();
+return null;
+} catch 

[GitHub] zhaijack commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
zhaijack commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181269469
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 ##
 @@ -246,14 +246,24 @@ ClientBuilder authentication(String authPluginClassName, 
Map aut
 ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
 
 /**
- * Number of concurrent lookup-requests allowed on each broker-connection 
to prevent overload on broker.
+ * Number of concurrent lookup-requests allowed to send on each 
broker-connection to prevent overload on broker.
  * (default: 5000) It should be configured with higher value only 
in case of it requires to produce/subscribe
  * on thousands of topic using created {@link PulsarClient}
  *
  * @param maxConcurrentLookupRequests
  */
 ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
 
+/**
+ * Number of max lookup-requests allowed on each broker-connection to 
prevent overload on broker.
+ * (default: 2) It should not be smaller than 
maxConcurrentLookupRequests.
+ * Requests that inside maxConcurrentLookupRequests already send to 
broker, and requests beyond
+ * maxConcurrentLookupRequests and under maxLookupRequests will wait in 
each client cnx.
+ *
+ * @param maxLookupRequests
 
 Review comment:
   Thanks, seems this class have a Since 2.0.0 in line 30.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat commented on issue #1562: Converted to v2 topic names test related to 
ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#issuecomment-380984325
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat commented on issue #1562: Converted to v2 topic names test related to 
ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#issuecomment-380946821
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1566: Provide unshaded pulsar-kafka-client

2018-04-12 Thread GitBox
merlimat commented on issue #1566: Provide unshaded pulsar-kafka-client
URL: https://github.com/apache/incubator-pulsar/pull/1566#issuecomment-380978713
 
 
   @sijie We should probably update the docs in 
`./site/docs/latest/adaptors/KafkaWrapper.md` to mention this artifact as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
merlimat commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380977903
 
 
   > I suppose I could hack in a default "is compatible" check that literally 
checks if they are equal which would require schema updates to happen via the 
REST interface. This would make upgrades hard/impossible to do seamlessly.
   
   I think for JSON we should first check if there's a schema and if the schema 
type is the same as a starter. For evolution, I guess the basic check would be 
to validate there are no type mismatchs. 
   It's typically ok to add/remove fields (provided we have configured Jackson 
to do so - and we should verify that), but changing type would lead to errors 
in deserialization.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1563: Added RabbitMQ source to Pulsar Connect

2018-04-12 Thread GitBox
sijie closed pull request #1563: Added RabbitMQ source to Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index a597861be6..c67b742d62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.
 2.2.0
 3.4.0
 4.1.5
+5.1.1
 
 
 3.4.0
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 38f9e63858..1b955ac00b 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -36,6 +36,7 @@
 twitter
 cassandra
 aerospike
+rabbitmq
   
 
 
diff --git a/pulsar-connect/rabbitmq/pom.xml b/pulsar-connect/rabbitmq/pom.xml
new file mode 100644
index 00..e4277c7827
--- /dev/null
+++ b/pulsar-connect/rabbitmq/pom.xml
@@ -0,0 +1,67 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-rabbitmq
+  Pulsar Connect :: RabbitMQ
+
+  
+
+
+  ${project.groupId}
+  pulsar-connect-core
+  ${project.version}
+
+
+
+  ${project.groupId}
+  pulsar-common
+  ${project.version}
+
+
+
+  com.fasterxml.jackson.core
+  jackson-databind
+  ${jackson.version}
+
+
+
+  com.fasterxml.jackson.dataformat
+  jackson-dataformat-yaml
+  ${jackson.version}
+
+
+
+  com.rabbitmq
+  amqp-client
+  ${rabbitmq-client.version}
+
+
+  
+
+
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
new file mode 100644
index 00..1d7268e3f3
--- /dev/null
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RabbitMQConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private String connectionName;
+private String amqUri;
+private String queueName;
+
+public static RabbitMQConfig load(String yamlFile) throws IOException {
+ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
+}
+
+public static RabbitMQConfig load(Map map) throws 
IOException {
+ObjectMapper mapper = new ObjectMapper();
+return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
RabbitMQConfig.class);
+}
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
new file mode 100644
index 00..59ce73ba72
--- /dev/null
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in 

[incubator-pulsar] branch master updated: Added RabbitMQ source to Pulsar Connect (#1563)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 9f62c3a  Added RabbitMQ source to Pulsar Connect (#1563)
9f62c3a is described below

commit 9f62c3a29ba4da541f338d86572f55c332208283
Author: Sanjeev Kulkarni 
AuthorDate: Thu Apr 12 16:23:31 2018 -0700

Added RabbitMQ source to Pulsar Connect (#1563)
---
 pom.xml|  1 +
 pulsar-connect/pom.xml |  1 +
 pulsar-connect/rabbitmq/pom.xml| 67 
 .../pulsar/connect/rabbitmq/RabbitMQConfig.java| 55 +
 .../pulsar/connect/rabbitmq/RabbitMQSource.java| 89 ++
 5 files changed, 213 insertions(+)

diff --git a/pom.xml b/pom.xml
index a597861..c67b742 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.
 2.2.0
 3.4.0
 4.1.5
+5.1.1
 
 
 3.4.0
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 38f9e63..1b955ac 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -36,6 +36,7 @@
 twitter
 cassandra
 aerospike
+rabbitmq
   
 
 
diff --git a/pulsar-connect/rabbitmq/pom.xml b/pulsar-connect/rabbitmq/pom.xml
new file mode 100644
index 000..e4277c7
--- /dev/null
+++ b/pulsar-connect/rabbitmq/pom.xml
@@ -0,0 +1,67 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.pulsar
+pulsar-connect
+2.0.0-incubating-SNAPSHOT
+  
+
+  pulsar-connect-rabbitmq
+  Pulsar Connect :: RabbitMQ
+
+  
+
+
+  ${project.groupId}
+  pulsar-connect-core
+  ${project.version}
+
+
+
+  ${project.groupId}
+  pulsar-common
+  ${project.version}
+
+
+
+  com.fasterxml.jackson.core
+  jackson-databind
+  ${jackson.version}
+
+
+
+  com.fasterxml.jackson.dataformat
+  jackson-dataformat-yaml
+  ${jackson.version}
+
+
+
+  com.rabbitmq
+  amqp-client
+  ${rabbitmq-client.version}
+
+
+  
+
+
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
new file mode 100644
index 000..1d7268e
--- /dev/null
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RabbitMQConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private String connectionName;
+private String amqUri;
+private String queueName;
+
+public static RabbitMQConfig load(String yamlFile) throws IOException {
+ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
+}
+
+public static RabbitMQConfig load(Map map) throws 
IOException {
+ObjectMapper mapper = new ObjectMapper();
+return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
RabbitMQConfig.class);
+}
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
new file mode 100644
index 000..59ce73b
--- 

[GitHub] sijie opened a new pull request #1566: Provide unshaded pulsar-kafka-client

2018-04-12 Thread GitBox
sijie opened a new pull request #1566: Provide unshaded pulsar-kafka-client
URL: https://github.com/apache/incubator-pulsar/pull/1566
 
 
   Similar as the original pulsar-client. It is good to provide unshaded 
pulsar-kafka-client, so when migrating from kafka to pulsar, there would be a 
change that original kafka client will be colocated with the 
pulsar-kafka-client. Providing an unshaded pulsar-kafka-client will address 
this concern.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1557: Added Kafka Source and Kafka Sink to Pulsar Connect

2018-04-12 Thread GitBox
srkukarni commented on issue #1557: Added Kafka Source and Kafka Sink to Pulsar 
Connect
URL: https://github.com/apache/incubator-pulsar/pull/1557#issuecomment-380969294
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia opened a new pull request #1565: Introduce admin api to get broker and namespace-isolation policy map

2018-04-12 Thread GitBox
rdhabalia opened a new pull request #1565: Introduce admin api to get broker 
and namespace-isolation policy map
URL: https://github.com/apache/incubator-pulsar/pull/1565
 
 
   ### Motivation
   
   Right now, with multiple namespace-isolation-policies, it is little bit 
tricky to find out list of brokers which are part of shared pool by going 
through regex. we also want to know policies attached to every broker for 
isolation policy analysis.
   
   ### Modifications
   
   Introduce admin api 
   1. Get list of brokers with namespace isolation policy attached to them
   2. get specific broker with namespace isolation policy attached to it
   
   ### Result
   
   it will not impact any existing functionality.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1564: Support short topic name in cpp client

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1564: Support short topic name in 
cpp client
URL: https://github.com/apache/incubator-pulsar/pull/1564#discussion_r181237995
 
 

 ##
 File path: pulsar-client-cpp/lib/TopicName.cc
 ##
 @@ -52,8 +52,19 @@ TopicName::TopicName() {}
 bool TopicName::init(const std::string& topicName) {
 topicName_ = topicName;
 if (topicName.find("://") == std::string::npos) {
-LOG_ERROR("Topic name is not valid, domain not present - " << 
topicName);
-return false;
+std::string topicNameCopy_ = topicName;
+std::vector pathTokens;
+boost::algorithm::split(pathTokens, topicNameCopy_, 
boost::algorithm::is_any_of("/"));
+if (pathTokens.size() == 3) {
+  topicName_ = "persistent://" + pathTokens[0] + "/" + pathTokens[1] + 
"/" + pathTokens[2];
 
 Review comment:
   nice catch. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #1564: Support short topic name in cpp client

2018-04-12 Thread GitBox
sijie opened a new pull request #1564: Support short topic name in cpp client
URL: https://github.com/apache/incubator-pulsar/pull/1564
 
 
   This related to #1535 - support short topic name in cpp client.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1547: Make client keepAliveInterval configurable in client side

2018-04-12 Thread GitBox
merlimat closed pull request #1547: Make client keepAliveInterval configurable 
in client side
URL: https://github.com/apache/incubator-pulsar/pull/1547
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index b832e59c9b..418953d5b7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -262,4 +262,12 @@ ClientBuilder authentication(String authPluginClassName, 
Map aut
  * @param maxNumberOfRejectedRequestPerConnection
  */
 ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection);
+
+/**
+ * Set keep alive interval in seconds for each client-broker-connection. 
(default: 30).
+ *
+ * @param keepAliveIntervalSeconds
+ * @param unit time unit for {@code statsInterval}
+ */
+ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit 
unit);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3effc7f3bf..deb06251fb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -155,6 +155,12 @@ public ClientBuilder 
maxNumberOfRejectedRequestPerConnection(int maxNumberOfReje
 return this;
 }
 
+@Override
+public ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, 
TimeUnit unit) {
+
conf.setKeepAliveIntervalSeconds((int)unit.toSeconds(keepAliveIntervalSeconds));
+return this;
+}
+
 public ClientConfigurationData getClientConfigurationData() {
 return conf;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d89d8fbd5a..39aac44799 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -107,7 +107,7 @@
 }
 
 public ClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
-super(30, TimeUnit.SECONDS);
+super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
 this.pendingLookupRequestSemaphore = new 
Semaphore(conf.getConcurrentLookupRequest(), true);
 this.authentication = conf.getAuthentication();
 this.eventLoopGroup = eventLoopGroup;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 4fe7569a3b..859bb1b763 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -53,6 +53,7 @@
 private boolean tlsHostnameVerificationEnable = false;
 private int concurrentLookupRequest = 5;
 private int maxNumberOfRejectedRequestPerConnection = 50;
+private int keepAliveIntervalSeconds = 30;
 
 public ClientConfigurationData clone() {
 try {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1563: Added RabbitMQ source to Pulsar Connect

2018-04-12 Thread GitBox
srkukarni commented on issue #1563: Added RabbitMQ source to Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1563#issuecomment-380947768
 
 
   @merlimat @sijie 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #1536: Add "default" tenant & "default" namespace

2018-04-12 Thread GitBox
sijie closed issue #1536: Add "default" tenant & "default" namespace
URL: https://github.com/apache/incubator-pulsar/issues/1536
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Issue #1536: Introduce Short Topic Name (#1535)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 61de30d  Issue #1536: Introduce Short Topic Name (#1535)
61de30d is described below

commit 61de30d09488395e3c19cf9232e755528a52d342
Author: Sijie Guo 
AuthorDate: Thu Apr 12 14:17:33 2018 -0700

Issue #1536: Introduce Short Topic Name (#1535)

This closes #1536
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 52 ++
 .../org/apache/pulsar/PulsarStandaloneStarter.java | 21 -
 .../org/apache/pulsar/common/naming/TopicName.java | 30 ++---
 .../apache/pulsar/common/naming/TopicNameTest.java | 49 +++-
 4 files changed, 134 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index acb5b6d..80e1a5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -20,13 +20,19 @@ package org.apache.pulsar;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -143,8 +149,54 @@ public class PulsarClusterMetadataSetup {
 // Ignore
 }
 
+// Create public tenant
+PropertyAdmin publicProperty = new PropertyAdmin();
+byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
+try {
+ZkUtils.createFullPathOptimistic(
+globalZk,
+POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY,
+publicPropertyDataJson,
+ZooDefs.Ids.OPEN_ACL_UNSAFE,
+CreateMode.PERSISTENT);
+} catch (NodeExistsException e) {
+// Ignore
+}
+
+// Create default namespace
+Policies policies = new Policies();
+policies.bundles = getBundles(4);
+byte[] defaultNamespaceDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
+try {
+ZkUtils.createFullPathOptimistic(
+globalZk,
+POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE,
+defaultNamespaceDataJson,
+ZooDefs.Ids.OPEN_ACL_UNSAFE,
+CreateMode.PERSISTENT);
+} catch (NodeExistsException e) {
+// Ignore
+}
+
 log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);
 }
 
+private static BundlesData getBundles(int numBundles) {
+Long maxVal = ((long) 1) << 32;
+Long segSize = maxVal / numBundles;
+List partitions = Lists.newArrayList();
+partitions.add(String.format("0x%08x", 0l));
+Long curPartition = segSize;
+for (int i = 0; i < numBundles; i++) {
+if (i != numBundles - 1) {
+partitions.add(String.format("0x%08x", curPartition));
+} else {
+partitions.add(String.format("0x%08x", maxVal - 1));
+}
+curPartition += segSize;
+}
+return new BundlesData(partitions);
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 68c307a..a5d0a43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import 

[GitHub] sijie closed pull request #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
sijie closed pull request #1535: Issue #1536: Introduce Short Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index acb5b6db7f..80e1a5c398 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -20,13 +20,19 @@
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -143,8 +149,54 @@ public static void main(String[] args) throws Exception {
 // Ignore
 }
 
+// Create public tenant
+PropertyAdmin publicProperty = new PropertyAdmin();
+byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
+try {
+ZkUtils.createFullPathOptimistic(
+globalZk,
+POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY,
+publicPropertyDataJson,
+ZooDefs.Ids.OPEN_ACL_UNSAFE,
+CreateMode.PERSISTENT);
+} catch (NodeExistsException e) {
+// Ignore
+}
+
+// Create default namespace
+Policies policies = new Policies();
+policies.bundles = getBundles(4);
+byte[] defaultNamespaceDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
+try {
+ZkUtils.createFullPathOptimistic(
+globalZk,
+POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE,
+defaultNamespaceDataJson,
+ZooDefs.Ids.OPEN_ACL_UNSAFE,
+CreateMode.PERSISTENT);
+} catch (NodeExistsException e) {
+// Ignore
+}
+
 log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);
 }
 
+private static BundlesData getBundles(int numBundles) {
+Long maxVal = ((long) 1) << 32;
+Long segSize = maxVal / numBundles;
+List partitions = Lists.newArrayList();
+partitions.add(String.format("0x%08x", 0l));
+Long curPartition = segSize;
+for (int i = 0; i < numBundles; i++) {
+if (i != numBundles - 1) {
+partitions.add(String.format("0x%08x", curPartition));
+} else {
+partitions.add(String.format("0x%08x", maxVal - 1));
+}
+curPartition += segSize;
+}
+return new BundlesData(partitions);
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 68c307aa33..a5d0a43e4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -203,13 +204,14 @@ void start() throws Exception {
 broker = new PulsarService(config, 
Optional.ofNullable(fnWorkerService));
 broker.start();
 
-// Create a sample namespace
 URL webServiceUrl = new URL(
 String.format("http://%s:%d;, config.getAdvertisedAddress(), 

[GitHub] merlimat commented on issue #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat commented on issue #1562: Converted to v2 topic names test related to 
ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#issuecomment-380946821
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on a change in pull request #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on a change in pull request #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#discussion_r181219020
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 ##
 @@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Sets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+private static final Logger log = 
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
+
+@BeforeMethod
+@Override
+protected void setup() throws Exception {
+super.internalSetup();
+super.producerBaseSetup();
+}
+
+@AfterMethod
+@Override
+protected void cleanup() throws Exception {
+super.internalCleanup();
+}
+
+public static class JsonEncodedPojo {
+private String message;
+
+public JsonEncodedPojo() {
+}
+
+public JsonEncodedPojo(String message) {
+this.message = message;
+}
+
+public String getMessage() {
+return message;
+}
+
+public void setMessage(String message) {
+this.message = message;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+JsonEncodedPojo that = (JsonEncodedPojo) o;
+return Objects.equals(message, that.message);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(message);
+}
+
+@Override
+public String toString() {
+return MoreObjects.toStringHelper(this)
+.add("message", message)
+.toString();
+}
+}
+
+@Test
+public void testJsonProducerAndConsumer() throws Exception {
 
 Review comment:
   Is this necessary? We have these tests without typing, what is different 
about these tests besides the schema? Could the differences be tested at a 
different level, ie, not having to instantiate a client/server 
producer/consumer just to test (schema name of a partitioned topic is one that 
comes to mind)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on a change in pull request #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on a change in pull request #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#discussion_r181219020
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 ##
 @@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Sets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+private static final Logger log = 
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
+
+@BeforeMethod
+@Override
+protected void setup() throws Exception {
+super.internalSetup();
+super.producerBaseSetup();
+}
+
+@AfterMethod
+@Override
+protected void cleanup() throws Exception {
+super.internalCleanup();
+}
+
+public static class JsonEncodedPojo {
+private String message;
+
+public JsonEncodedPojo() {
+}
+
+public JsonEncodedPojo(String message) {
+this.message = message;
+}
+
+public String getMessage() {
+return message;
+}
+
+public void setMessage(String message) {
+this.message = message;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+JsonEncodedPojo that = (JsonEncodedPojo) o;
+return Objects.equals(message, that.message);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(message);
+}
+
+@Override
+public String toString() {
+return MoreObjects.toStringHelper(this)
+.add("message", message)
+.toString();
+}
+}
+
+@Test
+public void testJsonProducerAndConsumer() throws Exception {
 
 Review comment:
   Is this necessary? We have these tests without typing, what is different 
about these tests besides the schema? Could the differences be tested at a 
different level (schema name of a partitioned topic is one that comes to mind)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380936892
 
 
   I suppose I could hack in a default "is compatible" check that literally 
checks if they are equal which would require schema updates to happen via the 
REST interface. This would make upgrades hard/impossible to do seamlessly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380928449
 
 
   > * When we attempt to create the producer, we should validate the schema 
this producer is about to start sending is compatible with the last schema we 
have in store, otherwise the producer should be denied.
   > * Same for consumer, if the type is incompatible we should fail the 
consumer creation.
   
   This will have to happen in the broker. At this very moment I have zero idea 
what it means for two JSON schemas to be "compatible", this will take some 
additional research.
   
   > * We should have tests to do basic operations on the JSON schema, eg: get 
the schemas for a topic, delete, create, update
   
   fair enough
   
   > * We should be able to have a check to control whether schema is enforced 
or not for topics of a particular namespace.
   
   "check", do you mean a config parameter?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1535: Issue #1536: Introduce Short 
Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535#discussion_r181198970
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
 ##
 @@ -240,6 +242,22 @@ void start() throws Exception {
 log.info(e.getMessage());
 }
 
+// Create a public tenant and default namespace
+final String publicTenant = TopicName.PUBLIC_PROPERTY;
+final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE;
+try {
+if (!admin.properties().getProperties().contains(publicTenant)) {
+admin.properties().createProperty(
+publicTenant,
+new 
PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+}
+if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+admin.namespaces().createNamespace(defaultNamespace);
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1535: Issue #1536: Introduce Short 
Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535#discussion_r181195826
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
 ##
 @@ -143,8 +149,54 @@ public static void main(String[] args) throws Exception {
 // Ignore
 }
 
+// Create public tenant
+PropertyAdmin publicProperty = new PropertyAdmin();
+byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
 
 Review comment:
   created a public/default namespace at standalone


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1440: Update default values for a few publisher settings

2018-04-12 Thread GitBox
sijie commented on issue #1440: Update default values for a few publisher 
settings
URL: https://github.com/apache/incubator-pulsar/pull/1440#issuecomment-380919510
 
 
   managed ledger closed test failed. seems like a flaky test. rebased to 
latest master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1535: Issue #1536: Introduce 
Short Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535#discussion_r181197109
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
 ##
 @@ -240,6 +242,22 @@ void start() throws Exception {
 log.info(e.getMessage());
 }
 
+// Create a public tenant and default namespace
+final String publicTenant = TopicName.PUBLIC_PROPERTY;
+final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE;
+try {
+if (!admin.properties().getProperties().contains(publicTenant)) {
+admin.properties().createProperty(
+publicTenant,
+new 
PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+}
+if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+admin.namespaces().createNamespace(defaultNamespace);
 
 Review comment:
   For v2, we need to specify the "replication" cluster. I added a convenience 
method in #1562, though you can just add : 
   ```java
   admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(config.getClusterName());
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Honor User specified Subscription Types while running functions (#1560)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1fc4b35  Honor User specified Subscription Types while running 
functions (#1560)
1fc4b35 is described below

commit 1fc4b35b5988936ef57f6f7e109e9afd23c138a8
Author: Sanjeev Kulkarni 
AuthorDate: Thu Apr 12 11:50:31 2018 -0700

Honor User specified Subscription Types while running functions (#1560)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java  | 7 +++
 .../pulsar/functions/instance/processors/MessageProcessor.java   | 4 +++-
 pulsar-functions/instance/src/main/python/python_instance.py | 9 ++---
 .../instance/src/main/python/python_instance_main.py | 2 ++
 .../org/apache/pulsar/functions/runtime/JavaInstanceMain.java| 4 
 .../java/org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 ++
 .../org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java  | 6 --
 .../java/org/apache/pulsar/functions/utils/FunctionConfig.java   | 3 ++-
 8 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 71ba928..34abea1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -293,6 +293,13 @@ public class CmdFunctions extends CmdBase {
 functionConfig.setParallelism(num);
 }
 
+if (functionConfig.getSubscriptionType() != null
+&& functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
+&& functionConfig.getProcessingGuarantees() != null
+&& functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+throw new IllegalArgumentException("Effectively Once can only 
be acheived with Failover subscription");
+}
+
 functionConfig.setAutoAck(true);
 inferMissingArguments(functionConfig);
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 1167d73..5f0e242 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -42,8 +42,10 @@ public interface MessageProcessor extends AutoCloseable {
 FunctionDetails.SubscriptionType fnSubType = 
functionDetails.getSubscriptionType();
 ProcessingGuarantees processingGuarantees = 
functionDetails.getProcessingGuarantees();
 SubscriptionType subType;
-if (null == fnSubType || FunctionDetails.SubscriptionType.SHARED == 
fnSubType) {
+if (FunctionDetails.SubscriptionType.SHARED == fnSubType) {
 subType = SubscriptionType.Shared;
+} else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) {
+subType = SubscriptionType.Exclusive;
 } else {
 subType = SubscriptionType.Failover;
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index c0165b6..53ca3f0 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -119,9 +119,12 @@ class PythonInstance(object):
 
   def run(self):
 # Setup consumers and input deserializers
-mode = pulsar._pulsar.ConsumerType.Exclusive
-if self.atmost_once:
-  mode = pulsar._pulsar.ConsumerType.Shared
+mode = pulsar._pulsar.ConsumerType.Shared
+if self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'):
+  mode = pulsar._pulsar.ConsumerType.Exclusive
+elif self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'):
+  mode = pulsar._pulsar.ConsumerType.Failover
+
 subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
 str(self.instance_config.function_details.namespace) + 
"/" + \
 str(self.instance_config.function_details.name)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0e9d857..42b1af3 100644
--- 

[GitHub] sijie closed pull request #1560: Honor User specified Subscription Types while running functions

2018-04-12 Thread GitBox
sijie closed pull request #1560: Honor User specified Subscription Types while 
running functions
URL: https://github.com/apache/incubator-pulsar/pull/1560
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 71ba928cb7..34abea169e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -293,6 +293,13 @@ void processArguments() throws Exception {
 functionConfig.setParallelism(num);
 }
 
+if (functionConfig.getSubscriptionType() != null
+&& functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
+&& functionConfig.getProcessingGuarantees() != null
+&& functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+throw new IllegalArgumentException("Effectively Once can only 
be acheived with Failover subscription");
+}
+
 functionConfig.setAutoAck(true);
 inferMissingArguments(functionConfig);
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 1167d73167..5f0e242796 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -42,8 +42,10 @@ static MessageProcessor create(PulsarClient client,
 FunctionDetails.SubscriptionType fnSubType = 
functionDetails.getSubscriptionType();
 ProcessingGuarantees processingGuarantees = 
functionDetails.getProcessingGuarantees();
 SubscriptionType subType;
-if (null == fnSubType || FunctionDetails.SubscriptionType.SHARED == 
fnSubType) {
+if (FunctionDetails.SubscriptionType.SHARED == fnSubType) {
 subType = SubscriptionType.Shared;
+} else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) {
+subType = SubscriptionType.Exclusive;
 } else {
 subType = SubscriptionType.Failover;
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index c0165b600e..53ca3f0571 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -119,9 +119,12 @@ def __init__(self, instance_id, function_id, 
function_version, function_details,
 
   def run(self):
 # Setup consumers and input deserializers
-mode = pulsar._pulsar.ConsumerType.Exclusive
-if self.atmost_once:
-  mode = pulsar._pulsar.ConsumerType.Shared
+mode = pulsar._pulsar.ConsumerType.Shared
+if self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'):
+  mode = pulsar._pulsar.ConsumerType.Exclusive
+elif self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'):
+  mode = pulsar._pulsar.ConsumerType.Failover
+
 subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
 str(self.instance_config.function_details.namespace) + 
"/" + \
 str(self.instance_config.function_details.name)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0e9d85775c..42b1af31aa 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -67,6 +67,7 @@ def main():
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function 
Version')
   parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
+  parser.add_argument('--subscription_type', required=True, help='Subscription 
Type')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, 

[GitHub] sijie closed pull request #1561: Utility for integration tests to wait for ZK

2018-04-12 Thread GitBox
sijie closed pull request #1561: Utility for integration tests to wait for ZK
URL: https://github.com/apache/incubator-pulsar/pull/1561
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
index c5b8e7019d..f7771ddf81 100644
--- 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
@@ -75,6 +75,26 @@ public static boolean zookeeperRunning(DockerClient docker, 
String containerId)
 return false;
 }
 
+public static boolean waitZooKeeperUp(DockerClient docker, String cluster, 
int timeout, TimeUnit timeoutUnit)
+throws Exception {
+Optional zookeeper = zookeeperSet(docker, 
cluster).stream().findAny();
+if (zookeeper.isPresent()) {
+long timeoutMillis = timeoutUnit.toMillis(timeout);
+long pollMillis = 1000;
+while (timeoutMillis > 0) {
+if (zookeeperRunning(docker, zookeeper.get())) {
+return true;
+}
+Thread.sleep(pollMillis);
+timeoutMillis -= pollMillis;
+}
+return false;
+} else {
+LOG.warn("No zookeeper containers found");
+return false;
+}
+}
+
 public static boolean runOnAnyBroker(DockerClient docker, String cluster, 
String... cmds) throws Exception {
 Optional broker = DockerUtils.cubeIdsWithLabels(
 docker,ImmutableMap.of("service", "pulsar-broker", "cluster", 
cluster)).stream().findAny();
@@ -266,4 +286,9 @@ public static void stopAllProxies(DockerClient docker, 
String cluster) {
 return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "pulsar-proxy",
  
"cluster", cluster));
 }
+
+public static Set zookeeperSet(DockerClient docker, String 
cluster) {
+return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "zookeeper",
+ 
"cluster", cluster));
+}
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181181049
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
 ##
 @@ -399,24 +408,39 @@ protected void 
handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
 }
 }
 
-private boolean addPendingLookupRequests(long requestId, 
CompletableFuture future) {
-if (pendingLookupRequestSemaphore.tryAcquire()) {
-pendingLookupRequests.put(requestId, future);
-eventLoopGroup.schedule(() -> {
-if (!future.isDone()) {
-future.completeExceptionally(new TimeoutException(
-requestId + " lookup request timedout after ms " + 
operationTimeoutMs));
-}
-}, operationTimeoutMs, TimeUnit.MILLISECONDS);
-return true;
-}
-return false;
+private void addPendingLookupRequests(long requestId, 
CompletableFuture future) {
 
 Review comment:
   better to leave a comment that the caller of this method needs to be 
protected under a semaphore


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181183798
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
 ##
 @@ -508,8 +533,14 @@ protected boolean isHandshakeCompleted() {
 if (log.isDebugEnabled()) {
 log.debug("{} Failed to add lookup-request into pending 
queue", requestId);
 }
-future.completeExceptionally(new 
PulsarClientException.TooManyRequestsException(
-"Failed due to too many pending lookup requests"));
+if (waitingLookupRequestSemaphore.tryAcquire()) {
+waitingLookupRequests.add(Pair.of(requestId, Pair.of(request, 
future)));
+} else {
+future.completeExceptionally(new 
PulsarClientException.TooManyRequestsException(
 
 Review comment:
   I think the logging here can be improved.
   
   it is essentially saying - "There are {x} lookup request outstanding and {y} 
request pending", x limited by `concurrentLookupRequest` while y limited by 
`maxLookupRequest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181183133
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 ##
 @@ -51,7 +51,8 @@
 private String tlsTrustCertsFilePath = "";
 private boolean tlsAllowInsecureConnection = false;
 private boolean tlsHostnameVerificationEnable = false;
-private int concurrentLookupRequest = 5;
+private int concurrentLookupRequest = 5000;
+private int maxLookupRequest = 2;
 
 Review comment:
   based the logic you changed, we need to set `maxLookupRequest` to 5 to 
make the logic similar as before, right?
   
   before this change, `concurrentLookupRequest` is 5


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181180624
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 ##
 @@ -246,14 +246,24 @@ ClientBuilder authentication(String authPluginClassName, 
Map aut
 ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
 
 /**
- * Number of concurrent lookup-requests allowed on each broker-connection 
to prevent overload on broker.
+ * Number of concurrent lookup-requests allowed to send on each 
broker-connection to prevent overload on broker.
  * (default: 5000) It should be configured with higher value only 
in case of it requires to produce/subscribe
  * on thousands of topic using created {@link PulsarClient}
  *
  * @param maxConcurrentLookupRequests
  */
 ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
 
+/**
+ * Number of max lookup-requests allowed on each broker-connection to 
prevent overload on broker.
+ * (default: 2) It should not be smaller than 
maxConcurrentLookupRequests.
+ * Requests that inside maxConcurrentLookupRequests already send to 
broker, and requests beyond
+ * maxConcurrentLookupRequests and under maxLookupRequests will wait in 
each client cnx.
+ *
+ * @param maxLookupRequests
 
 Review comment:
   I would encourage add `@since` annotation in the comment to indicate when 
the method was introduce.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1538: Add rate limit for client lookup requests

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1538: Add rate limit for client 
lookup requests
URL: https://github.com/apache/incubator-pulsar/pull/1538#discussion_r181184955
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
 ##
 @@ -508,8 +533,14 @@ protected boolean isHandshakeCompleted() {
 if (log.isDebugEnabled()) {
 log.debug("{} Failed to add lookup-request into pending 
queue", requestId);
 }
-future.completeExceptionally(new 
PulsarClientException.TooManyRequestsException(
-"Failed due to too many pending lookup requests"));
+if (waitingLookupRequestSemaphore.tryAcquire()) {
 
 Review comment:
   nit:
   
   I think you can use a bounded queue instead of two variables - one queue + 
one semaphore.
   
   if you are using a bounded queue, the logic here can be simplified:
   
   ```
   if (!waitingLookupRequests.offer(..)) {
   
fail(..)
   }
   ```
   
   
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html#offer(E)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1562: Converted to v2 topic 
names test related to ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#discussion_r181184646
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
 ##
 @@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.lookup.v1;
+
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.broker.lookup.TopicLookupBase;
+import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Path("/v2/destination/")
 
 Review comment:
   It's probably worth a comment in the code, but it's correct. 
   
   The lookup API was already `/v2/` in Pulsar 1.xxx. This was internally 
versioned at Yahoo to not clash from earlier API. 
   
   Since we're adding now the "Pulsar v2" we cannot rename this topic lookup 
into `/v1`. Rather the difference here would be : 
`lookup/v2/destination/persistent/prop/cluster/ns/topic` vs 
`lookup/v2/topic/persistent/prop/ns/topic`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1562: Converted to v2 topic 
names test related to ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#discussion_r181183241
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
 ##
 @@ -128,7 +128,7 @@ public void deleteNamespace(@PathParam("property") String 
property, @PathParam("
 }
 
 @DELETE
-@Path("/{property}/{namespace}/bundle/{bundle}")
 
 Review comment:
   This is only relative to v2 namespace. It was broken when v2 handler created.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1562: Converted to v2 topic names 
test related to ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562#discussion_r181179101
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
 ##
 @@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.lookup.v1;
+
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.broker.lookup.TopicLookupBase;
+import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Path("/v2/destination/")
 
 Review comment:
   the path here is wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1557: Added Kafka Source and Kafka Sink to Pulsar Connect

2018-04-12 Thread GitBox
srkukarni commented on issue #1557: Added Kafka Source and Kafka Sink to Pulsar 
Connect
URL: https://github.com/apache/incubator-pulsar/pull/1557#issuecomment-380899820
 
 
   @merlimat I've standardized the versions used by both modules. Please take a 
look again. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181171240
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
 
 Review comment:
   I am not sure we should parallel the offloading here. It is better to do 
this in sequence and update metadata one by one, for considering the failures 
(e.g. successfully offloading but fail to update offload contexts). 
`CompletableFuture.allOf` sometime can be bad for production. E.g. if S3 / cold 
storage outage for a while, pulsar can't offload the ledgers. the ledgers are 
accumulated, when S3 / cold storage is back online, the `allOf` logic will 
potentially start offloading ledgers in parallel, which can overwhelm the 
brokers and cause the system getting into a state that never recover.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181173676
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+.whenComplete((ignore, offloadException) -> {
+// If any of the offload operations failed 
offloadException will
+// be set. However some of the operations could have been 
successful.
+// If so, save the contexts for the successful prefix and 
notify the client
+// of the error if any occurred.
+List> successfulOffloads = 
Lists.newArrayList();
+int errors = 0;
+synchronized (this) {
+ledgersListMutex.lock();
+
+// loop through results of offload operations. If an 
error occurred
+// check that the ledger still exists for the ML. If 
not, then it was
+// trimmed and the error can be ignored.
+for (Pair> context : 
contexts) {
+if (context.getRight().isCompletedExceptionally()) 
{
+if (ledgers.containsKey(context.getLeft())) {
+errors++;
+}
+} else {
+
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+}
+}
+ledgersListMutex.unlock();
+}
+log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+ name, 

[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181176581
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+.whenComplete((ignore, offloadException) -> {
+// If any of the offload operations failed 
offloadException will
+// be set. However some of the operations could have been 
successful.
+// If so, save the contexts for the successful prefix and 
notify the client
+// of the error if any occurred.
+List> successfulOffloads = 
Lists.newArrayList();
+int errors = 0;
+synchronized (this) {
+ledgersListMutex.lock();
+
+// loop through results of offload operations. If an 
error occurred
+// check that the ledger still exists for the ML. If 
not, then it was
+// trimmed and the error can be ignored.
+for (Pair> context : 
contexts) {
+if (context.getRight().isCompletedExceptionally()) 
{
+if (ledgers.containsKey(context.getLeft())) {
+errors++;
+}
+} else {
+
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+}
+}
+ledgersListMutex.unlock();
+}
+log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+ name, 

[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181176505
 
 

 ##
 File path: managed-ledger/src/main/proto/MLDataFormats.proto
 ##
 @@ -27,6 +27,7 @@ message ManagedLedgerInfo {
optional int64 entries  = 2;
optional int64 size = 3;
optional int64 timestamp = 4;
+optional bytes offloadContext = 5;
 
 Review comment:
   nit: seems indent is wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181167383
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
 
 Review comment:
   can ledgers be empty? if ledgers can be empty, a NPE will be thrown here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181176228
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+.whenComplete((ignore, offloadException) -> {
+// If any of the offload operations failed 
offloadException will
+// be set. However some of the operations could have been 
successful.
+// If so, save the contexts for the successful prefix and 
notify the client
+// of the error if any occurred.
+List> successfulOffloads = 
Lists.newArrayList();
+int errors = 0;
+synchronized (this) {
+ledgersListMutex.lock();
+
+// loop through results of offload operations. If an 
error occurred
+// check that the ledger still exists for the ML. If 
not, then it was
+// trimmed and the error can be ignored.
+for (Pair> context : 
contexts) {
+if (context.getRight().isCompletedExceptionally()) 
{
+if (ledgers.containsKey(context.getLeft())) {
+errors++;
+}
+} else {
+
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+}
+}
+ledgersListMutex.unlock();
+}
+log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+ name, 

[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181168613
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
 
 Review comment:
   nit: space between ')' and 'pos'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181175295
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+.whenComplete((ignore, offloadException) -> {
+// If any of the offload operations failed 
offloadException will
+// be set. However some of the operations could have been 
successful.
+// If so, save the contexts for the successful prefix and 
notify the client
+// of the error if any occurred.
+List> successfulOffloads = 
Lists.newArrayList();
+int errors = 0;
+synchronized (this) {
+ledgersListMutex.lock();
+
+// loop through results of offload operations. If an 
error occurred
+// check that the ledger still exists for the ML. If 
not, then it was
+// trimmed and the error can be ignored.
+for (Pair> context : 
contexts) {
+if (context.getRight().isCompletedExceptionally()) 
{
+if (ledgers.containsKey(context.getLeft())) {
+errors++;
+}
+} else {
+
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+}
+}
+ledgersListMutex.unlock();
+}
+log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+ name, 

[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181176348
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
+return;
+}
+
+long current = ledgers.lastKey();
+
+// the first ledger which will not be offloaded. Defaults to 
current,
+// in the case that the whole headmap is offloaded. Otherwise it 
will
+// be set as we iterate through the headmap values
+long firstLedgerRetained = current;
+for (LedgerInfo ls : ledgers.headMap(current).values()) {
+if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
+if (!ls.hasOffloadContext()) {
+ledgersToOffload.add(ls);
+}
+} else {
+firstLedgerRetained = ls.getLedgerId();
+break;
+}
+}
+firstUnoffloaded = PositionImpl.get(firstLedgerRetained, 0);
+}
+
+if (ledgersToOffload.isEmpty()) {
+callback.offloadComplete(firstUnoffloaded, ctx);
+return;
+}
+
+log.info("[{}] Going to offload ledgers {}", name,
+ ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+List< Pair> > contexts = 
ledgersToOffload.stream()
+.map(info -> {
+long ledgerId = info.getLedgerId();
+Map extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
+CompletableFuture context = 
getLedgerHandle(ledgerId).thenCompose(
+readHandle -> 
config.getLedgerOffloader().offload(readHandle, extraMetadata));
+return Pair.of(ledgerId, context);
+}).collect(Collectors.toList());
+
+CompletableFuture.allOf(contexts.stream().map(p -> 
p.getRight()).toArray(CompletableFuture[]::new))
+.whenComplete((ignore, offloadException) -> {
+// If any of the offload operations failed 
offloadException will
+// be set. However some of the operations could have been 
successful.
+// If so, save the contexts for the successful prefix and 
notify the client
+// of the error if any occurred.
+List> successfulOffloads = 
Lists.newArrayList();
+int errors = 0;
+synchronized (this) {
+ledgersListMutex.lock();
+
+// loop through results of offload operations. If an 
error occurred
+// check that the ledger still exists for the ML. If 
not, then it was
+// trimmed and the error can be ignored.
+for (Pair> context : 
contexts) {
+if (context.getRight().isCompletedExceptionally()) 
{
+if (ledgers.containsKey(context.getLeft())) {
+errors++;
+}
+} else {
+
successfulOffloads.add(Pair.of(context.getLeft(), context.getRight().join()));
+}
+}
+ledgersListMutex.unlock();
+}
+log.info("[{}] All offload operations complete, {} 
successful, {} errors",
+ name, 

[GitHub] sijie commented on a change in pull request #1549: offloadPrefix implementation for managed ledger

2018-04-12 Thread GitBox
sijie commented on a change in pull request #1549: offloadPrefix implementation 
for managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1549#discussion_r181167564
 
 

 ##
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 ##
 @@ -1850,9 +1854,172 @@ public void offloadFailed(ManagedLedgerException e, 
Object ctx) {
 
 @Override
 public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-callback.offloadFailed(new ManagedLedgerException("Not implemented"), 
ctx);
+PositionImpl requestOffloadTo = (PositionImpl)pos;
+if (!isValidPosition(requestOffloadTo)) {
+callback.offloadFailed(new InvalidCursorPositionException("Invalid 
position for offload"), ctx);
+return;
+}
+
+Position firstUnoffloaded;
+
+List ledgersToOffload = Lists.newArrayList();
+synchronized (this) {
+log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", 
name, ledgers.keySet(),
+ TOTAL_SIZE_UPDATER.get(this));
+
+if (STATE_UPDATER.get(this) == State.Closed) {
+log.info("[{}] Ignoring offload request since the managed 
ledger was already closed", name);
+callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
+   "Can't offload closed managed 
ledger"), ctx);
 
 Review comment:
   nit: better add the managed ledger name in the exception message?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Documentation for non-persistent topics (#1468)

2018-04-12 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fafc963  Documentation for non-persistent topics (#1468)
fafc963 is described below

commit fafc9632c6f7facc1ce703ee7db9e6248abec345
Author: Luc Perkins 
AuthorDate: Thu Apr 12 11:06:06 2018 -0700

Documentation for non-persistent topics (#1468)

* add entries to C doc, config docs, etc., plus new cookbook

* merge information from C doc into include

* reshuffle existing material

* Finish draft of NPT cookbook

* clean up C+A section

* clean up client API section

* Change name of doc and update links

* change name of cookbook in C+A doc
---
 site/_data/config/broker.yaml  |  6 ++
 site/_data/sidebar.yaml|  2 +
 .../explanations/non-persistent-topics.md  | 62 ++--
 site/_includes/topic.html  |  2 +-
 .../latest/cookbooks/non-persistent-messaging.md   | 66 ++
 .../getting-started/ConceptsAndArchitecture.md | 58 +--
 6 files changed, 133 insertions(+), 63 deletions(-)

diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index 9cc9fb6..3b01180 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -18,6 +18,12 @@
 #
 
 configs:
+- name: enablePersistentTopics
+  default: 'true'
+  description: Whether persistent topics are enabled on the broker
+- name: enableNonPersistentTopics
+  default: 'true'
+  description: Whether non-persistent topics are enabled on the broker
 - name: functionsWorkerEnabled
   description: Whether the Pulsar Functions worker service is enabled in the 
broker
   default: 'false'
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index a4eeb0b..68fe426 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -134,6 +134,8 @@ groups:
   docs:
   - title: Message deduplication
 endpoint: message-deduplication
+  - title: Non-persistent messaging
+endpoint: non-persistent-messaging
   - title: Partitioned topics
 endpoint: PartitionedTopics
   - title: Retention and expiry
diff --git a/site/_includes/explanations/non-persistent-topics.md 
b/site/_includes/explanations/non-persistent-topics.md
index 74ff663..a94358e 100644
--- a/site/_includes/explanations/non-persistent-topics.md
+++ b/site/_includes/explanations/non-persistent-topics.md
@@ -19,64 +19,10 @@
 
 -->
 
-{% include admonition.html type="success" title='Notice' content="
-This feature is still in experimental mode and implementation details may 
change in future release.
-" %}
+By default, Pulsar persistently stores *all* {% popover unacknowledged %} 
messages on multiple [BookKeeper](#persistent-storage) {% popover bookies %} 
(storage nodes). Data for messages on persistent topics can thus survive {% 
popover broker %} restarts and subscriber failover.
 
-As name suggests, non-persist topic does not persist messages into any durable 
storage disk unlike persistent topic where messages are durably persisted on 
multiple disks. 
+Pulsar also, however, supports **non-persistent topics**, which are topics on 
which messages are *never* persisted to disk and live only in memory. When 
using non-persistent delivery, killing a Pulsar {% popover broker %} or 
disconnecting a subscriber to a topic means that all in-transit messages are 
lost on that (non-persistent) topic, meaning that clients may see message loss.
 
-Therefore, if you are using persistent delivery, messages are persisted to 
disk/database so that they will survive a broker restart or subscriber 
failover. While using non-persistent delivery, if you kill a broker or 
subscriber is disconnected then subscriber will lose all in-transit messages. 
So, client may see message loss with non-persistent topic.
+Non-persistent topics have names of this form (note the `non-persistent` in 
the name):
 
-- In non-persistent topic, as soon as broker receives published message, it 
immediately delivers this message to all connected subscribers without 
persisting them into any storage. So, if subscriber gets disconnected with 
broker then broker will not be able to deliver those in-transit messages and 
subscribers will never be able to receive those messages again. Broker also 
drops a message for the consumer, if consumer does not have enough permit to 
consume message, or consumer TCP channel [...]
-- Broker only allows configured number of in-flight messages per client 
connection. So, if producer tries to publish messages higher than this rate, 
then broker silently drops those new incoming messages without processing and 
delivering them to the subscribers. However, broker acknowledges with special 

[GitHub] merlimat closed pull request #1468: Documentation for non-persistent topics

2018-04-12 Thread GitBox
merlimat closed pull request #1468: Documentation for non-persistent topics
URL: https://github.com/apache/incubator-pulsar/pull/1468
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index 9cc9fb6d9b..3b0118083a 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -18,6 +18,12 @@
 #
 
 configs:
+- name: enablePersistentTopics
+  default: 'true'
+  description: Whether persistent topics are enabled on the broker
+- name: enableNonPersistentTopics
+  default: 'true'
+  description: Whether non-persistent topics are enabled on the broker
 - name: functionsWorkerEnabled
   description: Whether the Pulsar Functions worker service is enabled in the 
broker
   default: 'false'
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index a4eeb0ba5c..68fe4263c4 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -134,6 +134,8 @@ groups:
   docs:
   - title: Message deduplication
 endpoint: message-deduplication
+  - title: Non-persistent messaging
+endpoint: non-persistent-messaging
   - title: Partitioned topics
 endpoint: PartitionedTopics
   - title: Retention and expiry
diff --git a/site/_includes/explanations/non-persistent-topics.md 
b/site/_includes/explanations/non-persistent-topics.md
index 74ff6631cf..a94358e585 100644
--- a/site/_includes/explanations/non-persistent-topics.md
+++ b/site/_includes/explanations/non-persistent-topics.md
@@ -19,64 +19,10 @@
 
 -->
 
-{% include admonition.html type="success" title='Notice' content="
-This feature is still in experimental mode and implementation details may 
change in future release.
-" %}
+By default, Pulsar persistently stores *all* {% popover unacknowledged %} 
messages on multiple [BookKeeper](#persistent-storage) {% popover bookies %} 
(storage nodes). Data for messages on persistent topics can thus survive {% 
popover broker %} restarts and subscriber failover.
 
-As name suggests, non-persist topic does not persist messages into any durable 
storage disk unlike persistent topic where messages are durably persisted on 
multiple disks. 
+Pulsar also, however, supports **non-persistent topics**, which are topics on 
which messages are *never* persisted to disk and live only in memory. When 
using non-persistent delivery, killing a Pulsar {% popover broker %} or 
disconnecting a subscriber to a topic means that all in-transit messages are 
lost on that (non-persistent) topic, meaning that clients may see message loss.
 
-Therefore, if you are using persistent delivery, messages are persisted to 
disk/database so that they will survive a broker restart or subscriber 
failover. While using non-persistent delivery, if you kill a broker or 
subscriber is disconnected then subscriber will lose all in-transit messages. 
So, client may see message loss with non-persistent topic.
+Non-persistent topics have names of this form (note the `non-persistent` in 
the name):
 
-- In non-persistent topic, as soon as broker receives published message, it 
immediately delivers this message to all connected subscribers without 
persisting them into any storage. So, if subscriber gets disconnected with 
broker then broker will not be able to deliver those in-transit messages and 
subscribers will never be able to receive those messages again. Broker also 
drops a message for the consumer, if consumer does not have enough permit to 
consume message, or consumer TCP channel is not writable. Therefore, consumer 
receiver queue size (to accommodate enough permits) and TCP-receiver window 
size (to keep channel writable) should be configured properly to avoid message 
drop for that consumer.
-- Broker only allows configured number of in-flight messages per client 
connection. So, if producer tries to publish messages higher than this rate, 
then broker silently drops those new incoming messages without processing and 
delivering them to the subscribers. However, broker acknowledges with special 
message-id (`msg-id: -1:-1`) for those dropped messages to signal producer 
about the message drop.
-
- Performance
-
-Non-persistent messaging is usually faster than persistent messaging because 
broker does not persist messages and immediately sends ack back to producer as 
soon as that message deliver to all connected subscribers. Therefore, producer 
sees comparatively low publish latency with non-persistent topic.
-
-
- Client API
-
-
-A topic name will look like:
-
-```
-non-persistent://my-property/us-west/my-namespace/my-topic
-```
-
-Producer and consumer can connect to non-persistent topic in a similar way, as 
persistent topic except topic name must start with `non-persistent`.
-
-Non-persistent topic 

[GitHub] merlimat commented on a change in pull request #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1535: Issue #1536: Introduce 
Short Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535#discussion_r181172292
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
 ##
 @@ -143,8 +149,54 @@ public static void main(String[] args) throws Exception {
 // Ignore
 }
 
+// Create public tenant
+PropertyAdmin publicProperty = new PropertyAdmin();
+byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
 
 Review comment:
   I think we would still miss the `public/default` in standalone mode since it 
doesn't use this script. 
   
   Standalone is creating `sample/standalone/ns1` by default when it starts up. 
We should just change that into `public/default`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1561: Utility for integration tests to wait for ZK

2018-04-12 Thread GitBox
merlimat commented on issue #1561: Utility for integration tests to wait for ZK
URL: https://github.com/apache/incubator-pulsar/pull/1561#issuecomment-380890956
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1551: Json schema codec

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#discussion_r181162646
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 ##
 @@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Sets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+private static final Logger log = 
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
+
+@BeforeMethod
+@Override
+protected void setup() throws Exception {
+super.internalSetup();
+super.producerBaseSetup();
+}
+
+@AfterMethod
+@Override
+protected void cleanup() throws Exception {
+super.internalCleanup();
+}
+
+public static class JsonEncodedPojo {
+private String message;
+
+public JsonEncodedPojo() {
+}
+
+public JsonEncodedPojo(String message) {
+this.message = message;
+}
+
+public String getMessage() {
+return message;
+}
+
+public void setMessage(String message) {
+this.message = message;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+JsonEncodedPojo that = (JsonEncodedPojo) o;
+return Objects.equals(message, that.message);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(message);
+}
+
+@Override
+public String toString() {
+return MoreObjects.toStringHelper(this)
+.add("message", message)
+.toString();
+}
+}
+
+@Test
+public void testJsonProducerAndConsumer() throws Exception {
 
 Review comment:
   I think we'd need to have tests with multiple combination of features, eg: 
* Partitioned topics
* Batching enabled
* Consumer vs Reader


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1551: Json schema codec

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#discussion_r181162788
 
 

 ##
 File path: pulsar-client/pom.xml
 ##
 @@ -97,6 +97,12 @@
   4.4.9
 
 
+
 
 Review comment:
   The version should be added to `dependencyManagement` in top level pom


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1551: Json schema codec

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#discussion_r181163341
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
 ##
 @@ -63,7 +63,7 @@
  */
 byte[] getData();
 
-T getValue();
+T getValue() throws SchemaSerializationException;
 
 Review comment:
   Why add the exception when getting the value? I think the schema should be 
validated internally: if a messages reaches the application is should already 
be "valid" and not throw exception here, because that would force the 
application to handle it and 90% of people would be confused on what to do at 
this point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
sijie commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380883551
 
 
   @mgodave works for me. +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1550: Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return real LAC

2018-04-12 Thread GitBox
sijie closed pull request #1550: Issue #1517: make getLastConfirmedEntry in 
ManagedLedgerImpl return real LAC
URL: https://github.com/apache/incubator-pulsar/pull/1550
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bf2cd8c587..85b5879e95 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -357,7 +357,19 @@ public void operationFailed(MetaStoreException e) {
 STATE_UPDATER.set(this, State.LedgerOpened);
 lastLedgerCreatedTimestamp = 
System.currentTimeMillis();
 currentLedger = lh;
+
 lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+// bypass empty ledgers, find last ledger with Message 
if possible.
+while (lastConfirmedEntry.getEntryId() == -1) {
+Map.Entry formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+if (formerLedger != null) {
+LedgerInfo ledgerInfo = 
formerLedger.getValue();
+lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+} else {
+break;
+}
+}
+
 LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
 ledgers.put(lh.getId(), info);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 12415dbb1d..9891d0bbfc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -60,10 +60,10 @@ protected void cleanup() throws Exception {
 
 @Test
 public void testSimpleReader() throws Exception {
-Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
 .startMessageId(MessageId.earliest).create();
 
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
 .create();
 for (int i = 0; i < 10; i++) {
 String message = "my-message-" + i;
@@ -88,14 +88,14 @@ public void testSimpleReader() throws Exception {
 
 @Test
 public void testReaderAfterMessagesWerePublished() throws Exception {
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
 .create();
 for (int i = 0; i < 10; i++) {
 String message = "my-message-" + i;
 producer.send(message.getBytes());
 }
 
-Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
 .startMessageId(MessageId.earliest).create();
 
 Message msg = null;
@@ -116,17 +116,17 @@ public void testReaderAfterMessagesWerePublished() throws 
Exception {
 
 @Test
 public void testMultipleReaders() throws Exception {
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders")
 .create();
 for (int i = 0; i < 10; i++) {
 String message = "my-message-" + i;
 producer.send(message.getBytes());
 }
 
-Reader reader1 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+Reader reader1 = 

[incubator-pulsar] branch master updated: Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return real LAC (#1550)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 2de50a7  Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl 
return real LAC (#1550)
2de50a7 is described below

commit 2de50a762628647fcf1b7873b325fb1103c2b198
Author: Jia Zhai 
AuthorDate: Thu Apr 12 10:26:45 2018 -0700

Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return real 
LAC (#1550)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 
 .../apache/pulsar/client/api/TopicReaderTest.java  | 72 --
 2 files changed, 66 insertions(+), 18 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8649d1d..c0201dc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -359,7 +359,19 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 STATE_UPDATER.set(this, State.LedgerOpened);
 lastLedgerCreatedTimestamp = 
System.currentTimeMillis();
 currentLedger = lh;
+
 lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+// bypass empty ledgers, find last ledger with Message 
if possible.
+while (lastConfirmedEntry.getEntryId() == -1) {
+Map.Entry formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+if (formerLedger != null) {
+LedgerInfo ledgerInfo = 
formerLedger.getValue();
+lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+} else {
+break;
+}
+}
+
 LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
 ledgers.put(lh.getId(), info);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 12415db..9891d0b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -60,10 +60,10 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
 @Test
 public void testSimpleReader() throws Exception {
-Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
 .startMessageId(MessageId.earliest).create();
 
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
 .create();
 for (int i = 0; i < 10; i++) {
 String message = "my-message-" + i;
@@ -88,14 +88,14 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
 @Test
 public void testReaderAfterMessagesWerePublished() throws Exception {
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
 .create();
 for (int i = 0; i < 10; i++) {
 String message = "my-message-" + i;
 producer.send(message.getBytes());
 }
 
-Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+Reader reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
 .startMessageId(MessageId.earliest).create();
 
 Message msg = null;
@@ -116,17 +116,17 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
 @Test
 public void testMultipleReaders() throws Exception {
-Producer producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+Producer producer = 

[incubator-pulsar] branch master updated: Use `AtomicInteger` in LoggingFunction (#1558)

2018-04-12 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 199961e  Use `AtomicInteger` in LoggingFunction (#1558)
199961e is described below

commit 199961ef56e6f8cd1073245480b85e283987a95f
Author: Sijie Guo 
AuthorDate: Thu Apr 12 09:59:23 2018 -0700

Use `AtomicInteger` in LoggingFunction (#1558)

* Add `counter` back in LoggingFunction

* Switch to use AtomicInteger
---
 .../org/apache/pulsar/functions/api/examples/LoggingFunction.java  | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index 9cf3f55..e8e12ae 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -29,14 +29,13 @@ import org.slf4j.Logger;
  */
 public class LoggingFunction implements Function {
 
-private static final AtomicIntegerFieldUpdater 
COUNTER_UPDATER =
-AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter");
+private final AtomicInteger counter = new AtomicInteger(0);
 
 @Override
 public String process(String input, Context context) {
 Logger LOG = context.getLogger();
 
-int counterLocal = COUNTER_UPDATER.incrementAndGet(this);
+int counterLocal = counter.incrementAndGet();
 if ((counterLocal & Integer.MAX_VALUE) % 10 == 0) {
 LOG.info("Handled {} messages", counterLocal);
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.


[GitHub] merlimat closed pull request #1558: Use `AtomicInteger` in LoggingFunction

2018-04-12 Thread GitBox
merlimat closed pull request #1558: Use `AtomicInteger` in LoggingFunction
URL: https://github.com/apache/incubator-pulsar/pull/1558
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index 9cf3f550ad..e8e12aea73 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -29,14 +29,13 @@
  */
 public class LoggingFunction implements Function {
 
-private static final AtomicIntegerFieldUpdater 
COUNTER_UPDATER =
-AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter");
+private final AtomicInteger counter = new AtomicInteger(0);
 
 @Override
 public String process(String input, Context context) {
 Logger LOG = context.getLogger();
 
-int counterLocal = COUNTER_UPDATER.incrementAndGet(this);
+int counterLocal = counter.incrementAndGet();
 if ((counterLocal & Integer.MAX_VALUE) % 10 == 0) {
 LOG.info("Handled {} messages", counterLocal);
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1557: Added Kafka Source and Kafka Sink to Pulsar Connect

2018-04-12 Thread GitBox
merlimat commented on a change in pull request #1557: Added Kafka Source and 
Kafka Sink to Pulsar Connect
URL: https://github.com/apache/incubator-pulsar/pull/1557#discussion_r181147097
 
 

 ##
 File path: pom.xml
 ##
 @@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.
 2.2.0
 3.4.0
 4.1.5
+0.10.0.0
 
 Review comment:
   There's already a version of kafka clients used for the wrapper. We should 
probably standardize on one version


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #1559: Allow proxy to come up before any brokers have

2018-04-12 Thread GitBox
merlimat closed pull request #1559: Allow proxy to come up before any brokers 
have
URL: https://github.com/apache/incubator-pulsar/pull/1559
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 27d5a9b743..78223473b2 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -20,6 +20,7 @@
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +33,7 @@
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,18 +97,17 @@ public LoadManagerReport deserialize(String key, byte[] 
content) throws Exceptio
 });
 
 // Do initial fetch of brokers list
-availableBrokersSet = availableBrokersCache.get();
-updateBrokerList(availableBrokersSet);
+try {
+updateBrokerList(availableBrokersCache.get());
+} catch (NoNodeException nne) { // can happen if no broker started yet
+updateBrokerList(Collections.emptySet());
+}
 }
 
 public List getAvailableBrokers() {
 return availableBrokers;
 }
 
-public Set getAvailableBrokersSet() {
-return availableBrokersSet;
-}
-
 public ZooKeeperCache getLocalZkCache() {
 return localZkCache;
 }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index e2e46aa8f5..fed55c3eb6 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -194,11 +194,15 @@ public void invalidate(final String path) {
  * @throws InterruptedException
  */
 public boolean exists(final String path) throws KeeperException, 
InterruptedException {
+return exists(path, this);
+}
+
+private boolean exists(final String path, Watcher watcher) throws 
KeeperException, InterruptedException {
 try {
 return existsCache.get(path, new Callable() {
 @Override
 public Boolean call() throws Exception {
-return zkSession.get().exists(path, ZooKeeperCache.this) 
!= null;
+return zkSession.get().exists(path, watcher) != null;
 }
 });
 } catch (ExecutionException e) {
@@ -386,7 +390,14 @@ public Boolean call() throws Exception {
 });
 } catch (ExecutionException e) {
 Throwable cause = e.getCause();
-if (cause instanceof KeeperException) {
+// The node we want may not exist yet, so put a watcher on its 
existance
+// before throwing up the exception. Its possible that the node 
could have
+// been created after the call to getChildren, but before the call 
to exists().
+// If this is the case, exists will return true, and we just call 
getChildren again.
+if (cause instanceof KeeperException.NoNodeException
+&& exists(path, watcher)) {
+return getChildren(path, watcher);
+} else if (cause instanceof KeeperException) {
 throw (KeeperException) cause;
 } else if (cause instanceof InterruptedException) {
 throw (InterruptedException) cause;
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index af00abc42d..09c3ea1524 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
@@ -175,6 +176,63 @@ void 

[incubator-pulsar] branch master updated: Allow proxy to come up before any brokers have (#1559)

2018-04-12 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1c9c074  Allow proxy to come up before any brokers have (#1559)
1c9c074 is described below

commit 1c9c07428b6281b88c56ebeed5f595cf7dafabda
Author: Ivan Kelly 
AuthorDate: Thu Apr 12 18:39:13 2018 +0200

Allow proxy to come up before any brokers have (#1559)

If no brokers have come up, then /loadbalance/brokers will not have
been created. Previously, if a proxy came up at this point, it would
get a NoNodeException when it tried to watch the children of this
path, and the proxy itself would hang.

This change modifies the ZooKeeper cache, so that if you try to
getChildren on a node that doesn't exist, a watcher on that path's
existance will be created before the NoNodeException is thrown.

Callers can then call getChildren with a watcher, and expect the
watcher to trigger when the the node is created and has another node
created below it.
---
 .../proxy/server/util/ZookeeperCacheLoader.java| 13 ++---
 .../apache/pulsar/zookeeper/ZooKeeperCache.java| 15 +-
 .../pulsar/zookeeper/ZookeeperCacheTest.java   | 58 ++
 3 files changed, 78 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 27d5a9b..7822347 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server.util;
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +33,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,18 +97,17 @@ public class ZookeeperCacheLoader implements Closeable {
 });
 
 // Do initial fetch of brokers list
-availableBrokersSet = availableBrokersCache.get();
-updateBrokerList(availableBrokersSet);
+try {
+updateBrokerList(availableBrokersCache.get());
+} catch (NoNodeException nne) { // can happen if no broker started yet
+updateBrokerList(Collections.emptySet());
+}
 }
 
 public List getAvailableBrokers() {
 return availableBrokers;
 }
 
-public Set getAvailableBrokersSet() {
-return availableBrokersSet;
-}
-
 public ZooKeeperCache getLocalZkCache() {
 return localZkCache;
 }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index e2e46aa..fed55c3 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -194,11 +194,15 @@ public abstract class ZooKeeperCache implements Watcher {
  * @throws InterruptedException
  */
 public boolean exists(final String path) throws KeeperException, 
InterruptedException {
+return exists(path, this);
+}
+
+private boolean exists(final String path, Watcher watcher) throws 
KeeperException, InterruptedException {
 try {
 return existsCache.get(path, new Callable() {
 @Override
 public Boolean call() throws Exception {
-return zkSession.get().exists(path, ZooKeeperCache.this) 
!= null;
+return zkSession.get().exists(path, watcher) != null;
 }
 });
 } catch (ExecutionException e) {
@@ -386,7 +390,14 @@ public abstract class ZooKeeperCache implements Watcher {
 });
 } catch (ExecutionException e) {
 Throwable cause = e.getCause();
-if (cause instanceof KeeperException) {
+// The node we want may not exist yet, so put a watcher on its 
existance
+// before throwing up the exception. Its possible that the node 
could have
+// been created after the call to getChildren, but before the call 
to exists().
+// If this is the case, exists will return true, and we just call 
getChildren again.
+if (cause instanceof 

[GitHub] mgodave commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380866438
 
 
   @sijie circling back on your tutorial comment, I merely added some more 
files to an existing set of examples. I agree that this should not live under 
test but in order to keep like things together and to isolate changes I propose 
we leave these new files here and submit a new PR to move the tutorial files to 
a different location.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1560: Honor User specified Subscription Types while running functions

2018-04-12 Thread GitBox
srkukarni commented on issue #1560: Honor User specified Subscription Types 
while running functions
URL: https://github.com/apache/incubator-pulsar/pull/1560#issuecomment-380864475
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase

2018-04-12 Thread GitBox
merlimat opened a new pull request #1562: Converted to v2 topic names test 
related to ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562
 
 
   ### Motivation
   
   This is the first set of changes to convert unit tests to use v2 topic names.
   
   The PR contains also fixes, mainly to v2 lookup APIs to make sure all the 
functionalities were supported.  
   
   This first batch started the conversion from `ProducerConsumerBase` test 
class and all the tests that extends from that. A `V1_ProducerConsumerTest` 
test has been retained with v1 name to ensure we don't break existing topics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
mgodave commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380845432
 
 
   Yeah, I had hoped to take a look yesterday evening but fell asleep. Looking 
now. re: example under "test", I honestly didn't look, I copied an existing 
example so I could run an end-to-end test, I will move it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #1561: Utility for integration tests to wait for ZK

2018-04-12 Thread GitBox
ivankelly opened a new pull request #1561: Utility for integration tests to 
wait for ZK
URL: https://github.com/apache/incubator-pulsar/pull/1561
 
 
   Utility to allow an integration test to wait for zookeeper to come up
   before proceeding.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #1560: Honor User specified Subscription Types while running functions

2018-04-12 Thread GitBox
srkukarni commented on issue #1560: Honor User specified Subscription Types 
while running functions
URL: https://github.com/apache/incubator-pulsar/pull/1560#issuecomment-380825894
 
 
   @sijie @merlimat 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #1560: Honor User specified Subscription Types while running functions

2018-04-12 Thread GitBox
srkukarni opened a new pull request #1560: Honor User specified Subscription 
Types while running functions
URL: https://github.com/apache/incubator-pulsar/pull/1560
 
 
   ### Motivation
   
   Explain here the context, and why you're making that change.
   What is the problem you're trying to solve.
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1535: Issue #1536: Introduce Short Topic Name

2018-04-12 Thread GitBox
sijie commented on issue #1535: Issue #1536: Introduce Short Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535#issuecomment-380743572
 
 
   @merlimat this is ready for reviews 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1551: Json schema codec

2018-04-12 Thread GitBox
sijie commented on issue #1551: Json schema codec
URL: https://github.com/apache/incubator-pulsar/pull/1551#issuecomment-380743237
 
 
   @mgodave "javax/validation/constraints/NotBlank" not found - it sounds 
strange to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1548: Fixed race condition intruduced managed ledger addEntry introduced in #1521

2018-04-12 Thread GitBox
sijie closed pull request #1548: Fixed race condition intruduced managed ledger 
addEntry introduced in #1521
URL: https://github.com/apache/incubator-pulsar/pull/1548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bf2cd8c587..c60bd49827 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -92,7 +92,6 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -202,7 +201,7 @@
  * Queue of pending entries to be added to the managed ledger. Typically 
entries are queued when a new ledger is
  * created asynchronously and hence there is no ready ledger to write into.
  */
-final GrowableArrayBlockingQueue pendingAddEntries = new 
GrowableArrayBlockingQueue<>();
+final ConcurrentLinkedQueue pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
 // //
 
@@ -488,10 +487,11 @@ public void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback, Object ctx)
 }
 
 OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
-pendingAddEntries.add(addOperation);
 
 // Jump to specific thread to avoid contention from writers writing 
from different threads
 executor.executeOrdered(name, safeRun(() -> {
+pendingAddEntries.add(addOperation);
+
 internalAsyncAddEntry(addOperation);
 }));
 }
@@ -1197,7 +1197,7 @@ public synchronized void updateLedgersIdsComplete(Stat 
stat) {
 }
 
 // Process all the pending addEntry requests
-for (OpAddEntry op : pendingAddEntries.toList()) {
+for (OpAddEntry op : pendingAddEntries) {
 op.setLedger(currentLedger);
 ++currentLedgerEntries;
 currentLedgerSize += op.data.readableBytes();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fixed race condition intruduced managed ledger addEntry introduced in #1521 (#1548)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new aff6c04  Fixed race condition intruduced managed ledger addEntry 
introduced in #1521 (#1548)
aff6c04 is described below

commit aff6c04b156cffaf5b96e05afda5e096fbe34729
Author: Matteo Merli 
AuthorDate: Thu Apr 12 02:39:31 2018 -0700

Fixed race condition intruduced managed ledger addEntry introduced in #1521 
(#1548)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 15c380c..8649d1d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -95,7 +95,6 @@ import org.apache.pulsar.common.api.Commands;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -205,7 +204,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
  * Queue of pending entries to be added to the managed ledger. Typically 
entries are queued when a new ledger is
  * created asynchronously and hence there is no ready ledger to write into.
  */
-final GrowableArrayBlockingQueue pendingAddEntries = new 
GrowableArrayBlockingQueue<>();
+final ConcurrentLinkedQueue pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
 // //
 
@@ -491,10 +490,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
-pendingAddEntries.add(addOperation);
 
 // Jump to specific thread to avoid contention from writers writing 
from different threads
 executor.executeOrdered(name, safeRun(() -> {
+pendingAddEntries.add(addOperation);
+
 internalAsyncAddEntry(addOperation);
 }));
 }
@@ -1200,7 +1200,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 // Process all the pending addEntry requests
-for (OpAddEntry op : pendingAddEntries.toList()) {
+for (OpAddEntry op : pendingAddEntries) {
 op.setLedger(currentLedger);
 ++currentLedgerEntries;
 currentLedgerSize += op.data.readableBytes();

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.


[incubator-pulsar] branch master updated: remove unnecessary proto def in FunctionsConfig (#1531)

2018-04-12 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0fd4e84  remove unnecessary proto def in FunctionsConfig (#1531)
0fd4e84 is described below

commit 0fd4e84e2f5ee9cbfff729bfef0fd00933a83e9b
Author: Boyang Jerry Peng 
AuthorDate: Wed Apr 11 23:48:35 2018 -0700

remove unnecessary proto def in FunctionsConfig (#1531)
---
 pulsar-functions/proto/src/main/proto/Function.proto | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index ab4a726..c12059f 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -53,9 +53,6 @@ message FunctionConfig {
 bool autoAck = 13;
 repeated string inputs = 14;
 int32 parallelism = 15;
-// Fully qualified function name
-// (alternative to specifying tenant/namespace/name)
-string fqfn = 16;
 }
 
 message PackageLocationMetaData {

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.


[GitHub] sijie closed pull request #1531: remove unnecessary proto def in FunctionsConfig

2018-04-12 Thread GitBox
sijie closed pull request #1531: remove unnecessary proto def in FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index ab4a726432..c12059fe45 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -53,9 +53,6 @@ message FunctionConfig {
 bool autoAck = 13;
 repeated string inputs = 14;
 int32 parallelism = 15;
-// Fully qualified function name
-// (alternative to specifying tenant/namespace/name)
-string fqfn = 16;
 }
 
 message PackageLocationMetaData {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1531: remove unnecessary proto def in FunctionsConfig

2018-04-12 Thread GitBox
jerrypeng commented on issue #1531: remove unnecessary proto def in 
FunctionsConfig
URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-380694622
 
 
   @sijie can we merge this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #1552: renaming/changing FunctionConfig

2018-04-12 Thread GitBox
jerrypeng commented on issue #1552: renaming/changing FunctionConfig
URL: https://github.com/apache/incubator-pulsar/pull/1552#issuecomment-380694542
 
 
   @sijie I have rebased


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services