[GitHub] cckellogg opened a new pull request #2609: Initial commit for a go based pulsar cli.

2018-09-18 Thread GitBox
cckellogg opened a new pull request #2609: Initial commit for a go based pulsar 
cli.
URL: https://github.com/apache/incubator-pulsar/pull/2609
 
 
   ### Motivation
   
   Initial patch to create a pulsar go based admin cli. This will help improve 
the cli performance, make the user experience better with more detailed 
examples and help documentation and simply the command structure.
   
   ### Modifications
   
   New go based admin cli
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2608: Enforce boost-python was found by CMake

2018-09-18 Thread GitBox
merlimat opened a new pull request #2608: Enforce boost-python was found by 
CMake
URL: https://github.com/apache/incubator-pulsar/pull/2608
 
 
   ### Motivation
   
   Ensure the CMake preparation fails when the correct Boost-Python library is 
not found. With the current behavior, the build goes through even when not 
found, and then the resulting `_pulsar.so` will not be linked with boost-python 
and will fail to load at runtime.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch asf-site updated: Trigger asf-site sync

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 161473e  Trigger asf-site sync
161473e is described below

commit 161473ec2000373399fa85305f6d9e02537cda1d
Author: Matteo Merli 
AuthorDate: Tue Sep 18 21:38:49 2018 -0700

Trigger asf-site sync



[incubator-pulsar] branch master updated: [tests] remove PersistentQueueE2ETest#testConsumersWithDifferentPermits (#2604)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4fdc21a  [tests] remove 
PersistentQueueE2ETest#testConsumersWithDifferentPermits (#2604)
4fdc21a is described below

commit 4fdc21afba77184109757f62679eec4d972dbbc8
Author: Sijie Guo 
AuthorDate: Tue Sep 18 21:27:41 2018 -0700

[tests] remove PersistentQueueE2ETest#testConsumersWithDifferentPermits 
(#2604)

*Motivation*

There is no ways to gurantee how fast a consumer can consume. so it makes 
no sense to compare the different permits
between two consumers.

*Changes*

Remove PersistentQueueE2ETest#testConsumersWithDifferentPermits



[GitHub] sijie closed pull request #2604: [tests] remove PersistentQueueE2ETest#testConsumersWithDifferentPermits

2018-09-18 Thread GitBox
sijie closed pull request #2604: [tests] remove 
PersistentQueueE2ETest#testConsumersWithDifferentPermits
URL: https://github.com/apache/incubator-pulsar/pull/2604
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia closed pull request #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 38a55bf47c..8f9eefea94 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ void runCmd() throws Exception {
 protected String DEPRECATED_sinkConfigString;
 @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
 protected String sinkConfigString;
+@Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
+protected boolean autoAck = true;
+@Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
+protected Long timeoutMs;
 
 protected SinkConfig sinkConfig;
 
@@ -399,6 +403,15 @@ void processArguments() throws Exception {
 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
 }
 
+sinkConfig.setAutoAck(autoAck);
+if (timeoutMs != null) {
+sinkConfig.setTimeoutMs(timeoutMs);
+}
+
+if (null != sinkConfigString) {
+sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+}
+
 inferMissingArguments(sinkConfig);
 }
 
@@ -585,7 +598,11 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
 : SubscriptionType.SHARED;
 sourceSpecBuilder.setSubscriptionType(subType);
 
-functionDetailsBuilder.setAutoAck(true);
+functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+if (sinkConfig.getTimeoutMs() != null) {
+sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+}
+
 functionDetailsBuilder.setSource(sourceSpecBuilder);
 
 // set up sink spec
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index e3ba70e025..fb13d39913 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -133,6 +133,7 @@ public SinkConfig getSinkConfig() {
 sinkConfig.setTenant(TENANT);
 sinkConfig.setNamespace(NAMESPACE);
 sinkConfig.setName(NAME);
+sinkConfig.setAutoAck(true);
 
 sinkConfig.setInputs(INPUTS_LIST);
 sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index be886c41ae..1132fa6b86 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -78,6 +78,9 @@
 private boolean retainOrdering;
 @isValidResources
 private Resources resources;
+private boolean autoAck;
+@isPositiveNumber
+private Long timeoutMs;
 
 @isFileExists
 private String archive;
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 5b1fb41d8b..42f6c9c181 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1014,6 +1014,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `update`
@@ -1091,6 +1093,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `available-sinks`


 


This is an automated message from the Apache Git Service.
To respond to the 

[incubator-pulsar] branch master updated: add auto ack and timeout configurable (#2503)

2018-09-18 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8e14175  add auto ack and timeout configurable (#2503)
8e14175 is described below

commit 8e141752cbbbd038ecc183fd63b72ef19dfb32cc
Author: Rajan Dhabalia 
AuthorDate: Tue Sep 18 21:19:05 2018 -0700

add auto ack and timeout configurable (#2503)

* add auto ack and timeout configurable

* Fix test
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java| 19 ++-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java |  1 +
 .../org/apache/pulsar/functions/utils/SinkConfig.java |  3 +++
 site2/docs/reference-pulsar-admin.md  |  4 
 4 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 38a55bf..8f9eefe 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ public class CmdSinks extends CmdBase {
 protected String DEPRECATED_sinkConfigString;
 @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
 protected String sinkConfigString;
+@Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
+protected boolean autoAck = true;
+@Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
+protected Long timeoutMs;
 
 protected SinkConfig sinkConfig;
 
@@ -399,6 +403,15 @@ public class CmdSinks extends CmdBase {
 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
 }
 
+sinkConfig.setAutoAck(autoAck);
+if (timeoutMs != null) {
+sinkConfig.setTimeoutMs(timeoutMs);
+}
+
+if (null != sinkConfigString) {
+sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+}
+
 inferMissingArguments(sinkConfig);
 }
 
@@ -585,7 +598,11 @@ public class CmdSinks extends CmdBase {
 : SubscriptionType.SHARED;
 sourceSpecBuilder.setSubscriptionType(subType);
 
-functionDetailsBuilder.setAutoAck(true);
+functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+if (sinkConfig.getTimeoutMs() != null) {
+sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+}
+
 functionDetailsBuilder.setSource(sourceSpecBuilder);
 
 // set up sink spec
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index e3ba70e..fb13d39 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -133,6 +133,7 @@ public class TestCmdSinks {
 sinkConfig.setTenant(TENANT);
 sinkConfig.setNamespace(NAMESPACE);
 sinkConfig.setName(NAME);
+sinkConfig.setAutoAck(true);
 
 sinkConfig.setInputs(INPUTS_LIST);
 sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index be886c4..1132fa6 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -78,6 +78,9 @@ public class SinkConfig {
 private boolean retainOrdering;
 @isValidResources
 private Resources resources;
+private boolean autoAck;
+@isPositiveNumber
+private Long timeoutMs;
 
 @isFileExists
 private String archive;
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 5b1fb41..42f6c9c 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1014,6 +1014,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `update`
@@ -1091,6 +1093,8 @@ Options
 |`--sink-type`|The built-in sinks's 

[incubator-pulsar] 01/03: Fixed linking of boost and boost-python for osx build (#2302)

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch py-build-fix-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 6de19c8d6e4fbe322df7e2f02af5df661626
Author: Matteo Merli 
AuthorDate: Fri Aug 3 19:31:22 2018 +0900

Fixed linking of boost and boost-python for osx build (#2302)

Build of wheels files in OSX is broken due to failures in finding boost 
after installing from brew.
---
 pulsar-client-cpp/python/pkg/osx/build.sh.template | 14 +++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pkg/osx/build.sh.template 
b/pulsar-client-cpp/python/pkg/osx/build.sh.template
old mode 100644
new mode 100755
index 5720b27..71687ac
--- a/pulsar-client-cpp/python/pkg/osx/build.sh.template
+++ b/pulsar-client-cpp/python/pkg/osx/build.sh.template
@@ -23,21 +23,29 @@ set -e
 GITTAG=#TAG#
 PYTHONVER=#PYTHONVER#
 
+BOOST_VERSION=1.67
+
 /usr/bin/ruby -e "$(curl -fsSL 
https://raw.githubusercontent.com/Homebrew/install/master/install)"
 
-brew install openssl git boost pkg-config jsoncpp cmake protobuf260 log4cxx
+brew install openssl git boost@$BOOST_VERSION pkg-config jsoncpp cmake 
protobuf260 log4cxx
+
+brew link --force boost@$BOOST_VERSION
 
 if [ "$PYTHONVER" = "PYTHON2" ]
 then
-   brew install python@2 boost-python
+   brew install python@2 boost-python@$BOOST_VERSION
+
+   brew link --force boost-python@$BOOST_VERSION
 fi
 
 if [ "$PYTHONVER" = "PYTHON3" ]
 then
-   brew install python boost-python3
+   brew install python boost-python3@BOOST_VERSION
+   brew link --force boost-python3@$BOOST_VERSION
 fi
 
 brew link --force protobuf260
+
 rm -rf incubator-pulsar
 git clone --depth 1 --branch $GITTAG 
https://github.com/apache/incubator-pulsar.git
 cd incubator-pulsar/pulsar-client-cpp



[incubator-pulsar] branch py-build-fix-2.1.1 created (now fbc19fd)

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch py-build-fix-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


  at fbc19fd  Enforce boost-python was found by CMake

This branch includes the following new commits:

 new 6de19c8  Fixed linking of boost and boost-python for osx build (#2302)
 new e65143a  Fixed linking of python wrapper to boost-python in MacOS 
build (#2366)
 new fbc19fd  Enforce boost-python was found by CMake

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[incubator-pulsar] 03/03: Enforce boost-python was found by CMake

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch py-build-fix-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit fbc19fde3d0983daabc70b50ad6f7ec8ea9119af
Author: Matteo Merli 
AuthorDate: Tue Sep 18 20:33:58 2018 -0700

Enforce boost-python was found by CMake
---
 pulsar-client-cpp/python/CMakeLists.txt | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt 
b/pulsar-client-cpp/python/CMakeLists.txt
index d506844..78fcc67 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -37,7 +37,12 @@ endif()
 
 # Try all possible boost-python variable namings
 set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY} ${Boost_PYTHON3_LIBRARY}
-${Boost_PYTHON27-MT_LIBRARY} 
${Boost_PYTHON37-MT_LIBRARY})
+${Boost_PYTHON27-MT_LIBRARY} 
${Boost_PYTHON37-MT_LIBRARY}
+${Boost_PYTHON27-MT_LIBRARY_RELEASE} 
${Boost_PYTHON37-MT_LIBRARY_RELEASE})
+
+if (NOT PYTHON_WRAPPER_LIBS)
+MESSAGE(FATAL_ERROR "Could not find Boost Python library")
+endif ()
 
 if (APPLE)
 target_link_libraries(_pulsar -Wl,-all_load pulsarStatic 
${PYTHON_WRAPPER_LIBS})



[incubator-pulsar] 02/03: Fixed linking of python wrapper to boost-python in MacOS build (#2366)

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch py-build-fix-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit e65143a97ad5277682f84cd5f11fe7ad62c78e9c
Author: Matteo Merli 
AuthorDate: Mon Aug 13 16:56:23 2018 -0700

Fixed linking of python wrapper to boost-python in MacOS build (#2366)
---
 pulsar-client-cpp/python/CMakeLists.txt|  4 +++-
 pulsar-client-cpp/python/pkg/osx/build.sh.template | 14 ++
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt 
b/pulsar-client-cpp/python/CMakeLists.txt
index 8e70f72..d506844 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -35,7 +35,9 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
   set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS 
"${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -undefined dynamic_lookup")
 endif()
 
-set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY} ${Boost_PYTHON3_LIBRARY})
+# Try all possible boost-python variable namings
+set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY} ${Boost_PYTHON3_LIBRARY}
+${Boost_PYTHON27-MT_LIBRARY} 
${Boost_PYTHON37-MT_LIBRARY})
 
 if (APPLE)
 target_link_libraries(_pulsar -Wl,-all_load pulsarStatic 
${PYTHON_WRAPPER_LIBS})
diff --git a/pulsar-client-cpp/python/pkg/osx/build.sh.template 
b/pulsar-client-cpp/python/pkg/osx/build.sh.template
index 71687ac..40b7420 100755
--- a/pulsar-client-cpp/python/pkg/osx/build.sh.template
+++ b/pulsar-client-cpp/python/pkg/osx/build.sh.template
@@ -27,7 +27,7 @@ BOOST_VERSION=1.67
 
 /usr/bin/ruby -e "$(curl -fsSL 
https://raw.githubusercontent.com/Homebrew/install/master/install)"
 
-brew install openssl git boost@$BOOST_VERSION pkg-config jsoncpp cmake 
protobuf260 log4cxx
+brew install openssl git boost@$BOOST_VERSION pkg-config jsoncpp cmake 
protobuf260
 
 brew link --force boost@$BOOST_VERSION
 
@@ -40,8 +40,8 @@ fi
 
 if [ "$PYTHONVER" = "PYTHON3" ]
 then
-   brew install python boost-python3@BOOST_VERSION
-   brew link --force boost-python3@$BOOST_VERSION
+   brew install python boost-python3
+   brew link --force boost-python3
 fi
 
 brew link --force protobuf260
@@ -52,4 +52,10 @@ cd incubator-pulsar/pulsar-client-cpp
 cmake . -DBUILD_TESTS=OFF -DLINK_STATIC=ON
 make _pulsar -j8
 cd python
-python setup.py bdist_wheel
+
+if [ "$PYTHONVER" = "PYTHON2" ]
+then
+  python2 setup.py bdist_wheel
+else
+  python3 setup.py bdist_wheel
+fi



[GitHub] srkukarni commented on issue #2586: fixing/improving logging for function instance

2018-09-18 Thread GitBox
srkukarni commented on issue #2586: fixing/improving logging for function 
instance
URL: https://github.com/apache/incubator-pulsar/pull/2586#issuecomment-422638074
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: fix behavior of JSONSchema for derived classes (#2577)

2018-09-18 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 701f3ea  fix behavior of JSONSchema for derived classes (#2577)
701f3ea is described below

commit 701f3ea7e3541f61f064167043de04e8a2db5c0f
Author: Boyang Jerry Peng 
AuthorDate: Tue Sep 18 20:17:17 2018 -0700

fix behavior of JSONSchema for derived classes (#2577)

* fix behavior of JSONSchema for derived classes

* adding pom changes

* fix for nested classes

* adding to test

* removing debug log

* improving tests
---
 pulsar-client-schema/pom.xml   |  6 ++
 .../pulsar/client/impl/schema/JSONSchema.java  | 41 ++---
 .../pulsar/client/schema/JSONSchemaTest.java   | 67 --
 .../pulsar/client/schema/SchemaTestUtils.java  | 42 +++---
 4 files changed, 138 insertions(+), 18 deletions(-)

diff --git a/pulsar-client-schema/pom.xml b/pulsar-client-schema/pom.xml
index 477627e..5e72f6b 100644
--- a/pulsar-client-schema/pom.xml
+++ b/pulsar-client-schema/pom.xml
@@ -70,6 +70,12 @@
 jackson-module-jsonSchema
 
 
+
+com.google.code.gson
+gson
+${gson.version}
+
+
 
 
 org.apache.pulsar
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 5465f8c..d46c84b 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -22,28 +22,53 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import com.google.gson.ExclusionStrategy;
+import com.google.gson.FieldAttributes;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
+@Slf4j
 public class JSONSchema implements Schema{
 
 private final org.apache.avro.Schema schema;
 private final SchemaInfo schemaInfo;
-private final ObjectMapper objectMapper;
+private final Gson gson;
 private final Class pojo;
 private Map properties;
 
 private JSONSchema(Class pojo, Map properties) {
 this.pojo = pojo;
 this.properties = properties;
-this.objectMapper = new ObjectMapper();
+this.gson = new GsonBuilder().addSerializationExclusionStrategy(new 
ExclusionStrategy() {
+Set classes = new HashSet<>();
+
+@Override
+public boolean shouldSkipField(FieldAttributes f) {
+boolean skip = !(f.getDeclaringClass().equals(pojo)
+|| classes.contains(f.getDeclaringClass().getName())
+|| f.getDeclaringClass().isAssignableFrom(pojo));
+if (!skip) {
+classes.add(f.getDeclaredClass().getName());
+}
+return skip;
+}
+
+@Override
+public boolean shouldSkipClass(Class clazz) {
+return false;
+}
+}).create();
 
 this.schema = ReflectData.AllowNull.get().getSchema(pojo);
 this.schemaInfo = new SchemaInfo();
@@ -55,9 +80,10 @@ public class JSONSchema implements Schema{
 
 @Override
 public byte[] encode(T message) throws SchemaSerializationException {
+
 try {
-return objectMapper.writeValueAsBytes(message);
-} catch (JsonProcessingException e) {
+return this.gson.toJson(message).getBytes();
+} catch (RuntimeException e) {
 throw new SchemaSerializationException(e);
 }
 }
@@ -65,8 +91,8 @@ public class JSONSchema implements Schema{
 @Override
 public T decode(byte[] bytes) {
 try {
-return objectMapper.readValue(new String(bytes), pojo);
-} catch (IOException e) {
+return this.gson.fromJson(new String(bytes), this.pojo);
+} catch (RuntimeException e) {
 throw new RuntimeException(new SchemaSerializationException(e));
 }
 }
@@ -85,6 +111,7 @@ public class 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218635418
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,153 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
 
 Review comment:
   I don't understand this logic. here it creates producer only if producer is 
not present under `publishProducers`. So, does it really need it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422597240
 
 
   @rdhabalia thanks for the review.  I have addressed your comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590573
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia removed a comment on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia removed a comment on issue #2503: add auto ack and timeout 
configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590512
 
 
   retest java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590512
 
 
   retest java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2578: Add support for schema extraction from a jar

2018-09-18 Thread GitBox
aahmed-se commented on issue #2578: Add support for schema extraction from a jar
URL: https://github.com/apache/incubator-pulsar/pull/2578#issuecomment-422589407
 
 
   Added Test cases


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes

2018-09-18 Thread GitBox
srkukarni commented on issue #2577: fix behavior of JSONSchema for derived 
classes
URL: https://github.com/apache/incubator-pulsar/pull/2577#issuecomment-422578421
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549)

2018-09-18 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 49fc5e5  [Function] avoid creating assignment snapshot and publish 
individual assigment msg (#2549)
49fc5e5 is described below

commit 49fc5e508a996cfe59949effbcaf0abfa46028ce
Author: Rajan Dhabalia 
AuthorDate: Tue Sep 18 15:22:12 2018 -0700

[Function] avoid creating assignment snapshot and publish individual 
assigment msg (#2549)

Fix: Compaction with last deleted keys not completing compaction

Delete assignment with empty payload
---
 conf/functions_worker.yml  |   2 +
 .../pulsar/compaction/TwoPhaseCompactor.java   |   2 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   2 +-
 .../worker/PulsarWorkerAssignmentTest.java | 370 +
 .../proto/src/main/proto/Request.proto |   5 -
 .../functions/worker/FunctionAssignmentTailer.java |  47 ++-
 .../functions/worker/FunctionRuntimeManager.java   | 283 +++-
 .../pulsar/functions/worker/SchedulerManager.java  | 126 ---
 .../pulsar/functions/worker/WorkerConfig.java  |   5 +-
 .../pulsar/functions/worker/WorkerService.java |  15 +-
 .../worker/scheduler/RoundRobinScheduler.java  |  10 +-
 .../worker/FunctionRuntimeManagerTest.java |  40 +--
 .../functions/worker/MembershipManagerTest.java|   3 +
 .../functions/worker/SchedulerManagerTest.java | 298 +
 14 files changed, 791 insertions(+), 417 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb..0c7b8af 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,6 +44,8 @@ rescheduleTimeoutMs: 6
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 3
+# Frequency how often worker performs compaction on function-topics
+topicCompactionFrequencySec: 1800
 metricsSamplingPeriodSec: 60
 # Enforce authentication
 authenticationEnabled: false
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 425e049..612b336 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -359,4 +359,4 @@ public class TwoPhaseCompactor extends Compactor {
 this.latestForKey = latestForKey;
 }
 }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 32c93b4..8e7b292 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1268,4 +1268,4 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 }
 }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
new file mode 100644
index 000..fe28a51
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import 

[GitHub] rdhabalia closed pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg

2018-09-18 Thread GitBox
rdhabalia closed pull request #2549: [Function] avoid creating assignment 
snapshot and publish individual assignment msg
URL: https://github.com/apache/incubator-pulsar/pull/2549
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb9f6..0c7b8af695 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,6 +44,8 @@ rescheduleTimeoutMs: 6
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 3
+# Frequency how often worker performs compaction on function-topics
+topicCompactionFrequencySec: 1800
 metricsSamplingPeriodSec: 60
 # Enforce authentication
 authenticationEnabled: false
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 425e04921b..612b33601b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -359,4 +359,4 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map
 this.latestForKey = latestForKey;
 }
 }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 32c93b4626..8e7b292d6a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1268,4 +1268,4 @@ public void testEmptyCompactionLedger() throws Exception {
 }
 }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
new file mode 100644
index 00..fe28a5119e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
+import 

[GitHub] aahmed-se opened a new pull request #2607: Add default value for test image org tag

2018-09-18 Thread GitBox
aahmed-se opened a new pull request #2607: Add default value for test image org 
tag
URL: https://github.com/apache/incubator-pulsar/pull/2607
 
 
   This adds a default value for the image tag in cases it's not speicifed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2606: Add support for dead letter topics for java functions

2018-09-18 Thread GitBox
srkukarni opened a new pull request #2606: Add support for dead letter topics 
for java functions
URL: https://github.com/apache/incubator-pulsar/pull/2606
 
 
   ### Motivation
   
   This pr adds two options for running java functions
   1. Ability to control the number of times a message is retried by Pulsar
   2. Ability to redirect the messages that could not be processed even after 
retries to a dead letter topic
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch jerrypeng-patch-1 updated (e4eb459 -> e256a79)

2018-09-18 Thread jerrypeng
This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a change to branch jerrypeng-patch-1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


 discard e4eb459  Correcting SQL getting started title
 add 259275b  [tests] Fix the synchronization problem at 
BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
 add aebfbba  [tests] make PersistentQueueE2ETest tolerant failures during 
deleting topic at cleanup (#2600)
 add 6c07c81   [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache (#2598)
 add c866279  [java-client] Issue #2384: ConnectionHandler: Log stack trace 
instead of printing (#2599)
 add 8d50617  Fix: Compaction with last deleted keys not completing 
compaction (#2591)
 add fb396bf  [build] Fix docker organization parameter (#2597)
 add 7530d64  [tests] improve connector related integration tests (#2587)
 add e256a79  Correcting SQL getting started title

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e4eb459)
\
 N -- N -- N   refs/heads/jerrypeng-patch-1 (e256a79)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

No new revisions were added by this update.

Summary of changes:
 distribution/io/src/assemble/io.xml|   6 +
 docker/pom.xml |   3 -
 pom.xml|   1 +
 .../pulsar/compaction/TwoPhaseCompactor.java   |  90 +--
 .../broker/service/PersistentQueueE2ETest.java |  77 +++--
 .../client/impl/BrokerClientIntegrationTest.java   |  12 +-
 .../apache/pulsar/compaction/CompactionTest.java   |  56 ++
 .../pulsar/client/impl/ConnectionHandler.java  |   1 -
 .../functions/instance/JavaInstanceRunnable.java   |   3 +-
 .../pulsar/functions/source/TopicSchema.java   |   3 +
 ...rchAbstractSink.java => ElasticSearchSink.java} |   9 +-
 .../io/elasticsearch/ElasticSearchStringSink.java  |  35 --
 .../resources/META-INF/services/pulsar-io.yaml |   4 +-
 .../io/elasticsearch/ElasticSearchSinkTests.java   |   4 +-
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |   6 +-
 .../{KafkaStringSink.java => KafkaBytesSink.java}  |  25 -
 .../resources/META-INF/services/pulsar-io.yaml |   2 +-
 .../pulsar/zookeeper/ZookeeperCacheTest.java   |   8 +-
 site2/docs/io-quickstart.md|   2 +-
 .../version-2.1.0-incubating/io-quickstart.md  |   2 +-
 .../integration/functions/PulsarFunctionsTest.java |  39 +--
 .../functions/utils/CommandGenerator.java  |   2 +-
 .../integration/io/CassandraSinkArchiveTester.java | 121 -
 .../tests/integration/io/CassandraSinkTester.java  |  45 +---
 .../integration/io/ElasticSearchSinkTester.java|  34 +++---
 .../tests/integration/io/HdfsSinkTester.java   |  22 ++--
 .../tests/integration/io/JdbcSinkTester.java   |  38 ---
 .../tests/integration/io/KafkaSinkTester.java  |  28 +++--
 .../tests/integration/io/KafkaSourceTester.java|  12 +-
 .../pulsar/tests/integration/io/SinkTester.java|  25 -
 .../pulsar/tests/integration/io/SourceTester.java  |   4 +-
 .../tests/integration/suites/PulsarTestSuite.java  |  50 -
 .../integration/topologies/PulsarCluster.java  |  17 +++
 33 files changed, 352 insertions(+), 434 deletions(-)
 rename 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/{ElasticSearchAbstractSink.java
 => ElasticSearchSink.java} (94%)
 delete mode 100644 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
 rename 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/{KafkaStringSink.java 
=> KafkaBytesSink.java} (50%)
 delete mode 100644 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java



[GitHub] sijie closed pull request #2587: [tests] improve connector related integration tests

2018-09-18 Thread GitBox
sijie closed pull request #2587: [tests] improve connector related integration 
tests
URL: https://github.com/apache/incubator-pulsar/pull/2587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 08ff859e44..a509e19254 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
   connectors
   644
 
+
+
+  
${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar
+  connectors
+  644
+
   
 
diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7297..302bda80cc 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   org.apache.pulsar
   docker-images
   Apache Pulsar :: Docker Images
-  
-apachepulsar
-  
   
 pulsar
 grafana
diff --git a/pom.xml b/pom.xml
index 6d45a0093d..91ba14c55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.
 true
 false
 1
+apachepulsar
 
 
 
2.1.0-incubating
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0caf..fb7a76cdff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public void testReplayOnConsumerDisconnect() throws 
Exception {
 deleteTopic(topicName);
 }
 
-@Test
-public void testConsumersWithDifferentPermits() throws Exception {
-final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
-final String subName = "sub4";
-final int numMsgs = 1;
-
-final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
-final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
-final CountDownLatch latch = new CountDownLatch(numMsgs);
-
-int recvQ1 = 10;
-Consumer consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
-.messageListener((consumer, msg) -> {
-msgCountConsumer1.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-latch.countDown();
-} catch (PulsarClientException e) {
-fail("Should not fail");
-}
-}).subscribe();
-
-int recvQ2 = 1;
-Consumer consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
-.messageListener((consumer, msg) -> {
-msgCountConsumer2.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-latch.countDown();
-} catch (PulsarClientException e) {
-fail("Should not fail");
-}
-}).subscribe();
-
-List> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-Producer producer = pulsarClient.newProducer().topic(topicName)
-.enableBatching(false)
-.maxPendingMessages(numMsgs + 1)
-.messageRoutingMode(MessageRoutingMode.SinglePartition)
-.create();
-for (int i = 0; i < numMsgs; i++) {
-String message = "msg-" + i;
-futures.add(producer.sendAsync(message.getBytes()));
-}
-FutureUtil.waitForAll(futures).get();
-producer.close();
-
-latch.await(5, TimeUnit.SECONDS);
-
-assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + 
recvQ2), numMsgs * 0.1);
-assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), 
numMsgs * 0.1);
-
-consumer1.close();
-consumer2.close();
-
-deleteTopic(topicName);
-}
-
 // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
 // expected numbers. keeping this disabled to not break the build, but 
nevertheless this gives good insight into
 // how the round robin distribution algorithm is behaving
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 

[incubator-pulsar] branch master updated: [tests] improve connector related integration tests (#2587)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7530d64  [tests] improve connector related integration tests (#2587)
7530d64 is described below

commit 7530d64a679a0783122b18058c1148c89c0fee0a
Author: Sijie Guo 
AuthorDate: Tue Sep 18 14:50:51 2018 -0700

[tests] improve connector related integration tests (#2587)

*Motivation*

with more and more connector are added, it becomes expensive to start all 
external services at the begin.

*Changes*

- refactor the connector testing framework to start external service before 
methods
- fix kafka, cassandra and mysql connectors
---
 distribution/io/src/assemble/io.xml|   6 +
 .../broker/service/PersistentQueueE2ETest.java |  60 --
 .../functions/instance/JavaInstanceRunnable.java   |   3 +-
 .../pulsar/functions/source/TopicSchema.java   |   3 +
 ...rchAbstractSink.java => ElasticSearchSink.java} |   9 +-
 .../io/elasticsearch/ElasticSearchStringSink.java  |  35 --
 .../resources/META-INF/services/pulsar-io.yaml |   4 +-
 .../io/elasticsearch/ElasticSearchSinkTests.java   |   4 +-
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |   6 +-
 .../{KafkaStringSink.java => KafkaBytesSink.java}  |  25 -
 .../resources/META-INF/services/pulsar-io.yaml |   2 +-
 site2/docs/io-quickstart.md|   2 +-
 .../version-2.1.0-incubating/io-quickstart.md  |   2 +-
 .../integration/functions/PulsarFunctionsTest.java |  39 +--
 .../functions/utils/CommandGenerator.java  |   2 +-
 .../integration/io/CassandraSinkArchiveTester.java | 121 -
 .../tests/integration/io/CassandraSinkTester.java  |  45 +---
 .../integration/io/ElasticSearchSinkTester.java|  34 +++---
 .../tests/integration/io/HdfsSinkTester.java   |  22 ++--
 .../tests/integration/io/JdbcSinkTester.java   |  38 ---
 .../tests/integration/io/KafkaSinkTester.java  |  28 +++--
 .../tests/integration/io/KafkaSourceTester.java|  12 +-
 .../pulsar/tests/integration/io/SinkTester.java|  25 -
 .../pulsar/tests/integration/io/SourceTester.java  |   4 +-
 .../tests/integration/suites/PulsarTestSuite.java  |  50 -
 .../integration/topologies/PulsarCluster.java  |  17 +++
 26 files changed, 212 insertions(+), 386 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 08ff859..a509e19 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
   connectors
   644
 
+
+
+  
${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar
+  connectors
+  644
+
   
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0..fb7a76c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public class PersistentQueueE2ETest extends BrokerTestBase 
{
 deleteTopic(topicName);
 }
 
-@Test
-public void testConsumersWithDifferentPermits() throws Exception {
-final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
-final String subName = "sub4";
-final int numMsgs = 1;
-
-final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
-final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
-final CountDownLatch latch = new CountDownLatch(numMsgs);
-
-int recvQ1 = 10;
-Consumer consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
-.messageListener((consumer, msg) -> {
-msgCountConsumer1.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-latch.countDown();
-} catch (PulsarClientException e) {
-fail("Should not fail");
-}
-}).subscribe();
-
-int recvQ2 = 1;
-Consumer consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
-.messageListener((consumer, msg) -> {
-msgCountConsumer2.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-

[GitHub] sijie closed pull request #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
sijie closed pull request #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7297..302bda80cc 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   org.apache.pulsar
   docker-images
   Apache Pulsar :: Docker Images
-  
-apachepulsar
-  
   
 pulsar
 grafana
diff --git a/pom.xml b/pom.xml
index 6d45a0093d..91ba14c55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.
 true
 false
 1
+apachepulsar
 
 
 
2.1.0-incubating


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: [build] Fix docker organization parameter (#2597)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fb396bf  [build] Fix docker organization parameter (#2597)
fb396bf is described below

commit fb396bf8313f728f5659286e4d8924bcf37d4832
Author: Sijie Guo 
AuthorDate: Tue Sep 18 14:50:12 2018 -0700

[build] Fix docker organization parameter (#2597)

*Motivation*

docker orgnization is missing for building test image. so the build will be 
failing with `-Pdocker`.

*Changes*

Move the docker organization parameter to root pom file.
---
 docker/pom.xml | 3 ---
 pom.xml| 1 +
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7..302bda8 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   org.apache.pulsar
   docker-images
   Apache Pulsar :: Docker Images
-  
-apachepulsar
-  
   
 pulsar
 grafana
diff --git a/pom.xml b/pom.xml
index 6d45a00..91ba14c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.
 true
 false
 1
+apachepulsar
 
 
 
2.1.0-incubating



[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606849
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606215
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606215
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603818
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603562
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218605442
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218604865
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng opened a new pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng opened a new pull request #2605: implement topic routing on a per 
record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605
 
 
   ### Motivation
   
   There are use cases that the destination topic for a message cannot be 
determined at source submission time.  This requires the ability for sources to 
to set which topic a record should be written to.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
srkukarni commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422562254
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2504: Add Presto Sql Test

2018-09-18 Thread GitBox
aahmed-se commented on issue #2504: Add Presto Sql Test
URL: https://github.com/apache/incubator-pulsar/pull/2504#issuecomment-422553884
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg

2018-09-18 Thread GitBox
rdhabalia commented on issue #2549: [Function] avoid creating assignment 
snapshot and publish individual assignment msg
URL: https://github.com/apache/incubator-pulsar/pull/2549#issuecomment-422548635
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422511361
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia removed a comment on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia removed a comment on issue #2503: add auto ack and timeout 
configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422284349
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file

2018-09-18 Thread GitBox
srkukarni commented on a change in pull request #2593: Add support for running 
python functions with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r218534628
 
 

 ##
 File path: pulsar-functions/instance/src/main/python/python_instance_main.py
 ##
 @@ -74,6 +73,15 @@ def main():
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
   json_format.Parse(args.function_details, function_details)
+
+  if function_details.runtime == 
Function_pb2.FunctionDetails.Runtime.Value("PYTHON_WHEEL"):
+try:
+  util.install_wheel(str(args.py))
 
 Review comment:
   I've redone that part with installing using virtualenv to minimize impact on 
the system and needing permissions


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
srkukarni commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422488387
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2593: Add support for running python functions with wheel file

2018-09-18 Thread GitBox
sijie commented on a change in pull request #2593: Add support for running 
python functions with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r218530450
 
 

 ##
 File path: pulsar-functions/instance/src/main/python/python_instance_main.py
 ##
 @@ -74,6 +73,15 @@ def main():
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
   json_format.Parse(args.function_details, function_details)
+
+  if function_details.runtime == 
Function_pb2.FunctionDetails.Runtime.Value("PYTHON_WHEEL"):
+try:
+  util.install_wheel(str(args.py))
 
 Review comment:
   I am not familiar with python. but isn't that manipulating the environment, 
and doesn't it require sudos and such?



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
srkukarni commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422483756
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #2604: [tests] remove PersistentQueueE2ETest#testConsumersWithDifferentPermits

2018-09-18 Thread GitBox
sijie opened a new pull request #2604: [tests] remove 
PersistentQueueE2ETest#testConsumersWithDifferentPermits
URL: https://github.com/apache/incubator-pulsar/pull/2604
 
 
   *Motivation*
   
   There is no ways to gurantee how fast a consumer can consume. so it makes no 
sense to compare the different permits
   between two consumers.
   
   *Changes*
   
   Remove PersistentQueueE2ETest#testConsumersWithDifferentPermits
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one

2018-09-18 Thread GitBox
merlimat commented on issue #2580: [Python] Consolidated duplicated 
subscribe_*() methods into a single one
URL: https://github.com/apache/incubator-pulsar/pull/2580#issuecomment-422482216
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
sijie commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422478082
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on issue #2590: Issue #2584: unacked message is not redelivered on time

2018-09-18 Thread GitBox
jiazhai commented on issue #2590: Issue #2584: unacked message is not 
redelivered on time
URL: https://github.com/apache/incubator-pulsar/pull/2590#issuecomment-422432920
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2591: Fix: Compaction with last deleted keys not completing compaction

2018-09-18 Thread GitBox
sijie closed pull request #2591: Fix: Compaction with last deleted keys not 
completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710249..425e04921b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
 @Override
 protected CompletableFuture doCompaction(RawReader reader, 
BookKeeper bk) {
 return phaseOne(reader).thenCompose(
-(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
r.latestForKey, bk));
 }
 
 private CompletableFuture phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
 } else {
 log.info("Commencing phase one of compaction for {}, 
reading to {}",
  reader.getTopic(), lastMessageId);
-phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
+phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastMessageId, latestForKey,
+loopPromise);
 }
 });
 return loopPromise;
@@ -98,6 +99,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
 
 private void phaseOneLoop(RawReader reader,
   Optional firstMessageId,
+  Optional toMessageId,
   MessageId lastMessageId,
   Map latestForKey,
   CompletableFuture loopPromise) {
@@ -114,6 +116,7 @@ private void phaseOneLoop(RawReader reader,
 return;
 }
 MessageId id = m.getMessageId();
+boolean deletedMessage = false;
 if (RawBatchConverter.isReadableBatch(m)) {
 try {
 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ private void phaseOneLoop(RawReader reader,
 } else {
 Pair keyAndSize = 
extractKeyAndSize(m);
 if (keyAndSize != null) {
-latestForKey.put(keyAndSize.getLeft(), id);
+if(keyAndSize.getRight() > 0) {
+latestForKey.put(keyAndSize.getLeft(), 
id);
+} else {
+deletedMessage = true;
+latestForKey.remove(keyAndSize.getLeft());
+}
 }
 }
 
+MessageId first = firstMessageId.orElse(deletedMessage 
? null : id);
+MessageId to = deletedMessage ? 
toMessageId.orElse(null) : id;
 if (id.compareTo(lastMessageId) == 0) {
-loopPromise.complete(new 
PhaseOneResult(firstMessageId.orElse(id),
-id, 
latestForKey));
+loopPromise.complete(new PhaseOneResult(first, to, 
lastMessageId, latestForKey));
 } else {
 phaseOneLoop(reader,
- 
Optional.of(firstMessageId.orElse(id)),
+ Optional.ofNullable(first),
+ Optional.ofNullable(to),
  lastMessageId,
  latestForKey, loopPromise);
 }
@@ -153,40 +163,38 @@ private void 
scheduleTimeout(CompletableFuture future) {
 });
 }
 
-private CompletableFuture phaseTwo(RawReader reader, MessageId from, 
MessageId to,
- Map 
latestForKey, BookKeeper bk) {
+private CompletableFuture phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
+Map latestForKey, BookKeeper bk) {
 Map metadata = ImmutableMap.of("compactedTopic", 
reader.getTopic().getBytes(UTF_8),
- 

[incubator-pulsar] branch master updated: Fix: Compaction with last deleted keys not completing compaction (#2591)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d50617  Fix: Compaction with last deleted keys not completing 
compaction (#2591)
8d50617 is described below

commit 8d5061779e7a869e6e5981d2010e671ed0afa322
Author: Rajan Dhabalia 
AuthorDate: Tue Sep 18 03:09:50 2018 -0700

Fix: Compaction with last deleted keys not completing compaction (#2591)

### Motivation

Right now, topic-compaction ignores the message-id with empty payload but 
if the last message in the ledger has empty payload then compactor doesn't 
complete the compaction because compactor ignores last message and doesn't 
complete the result-future so, caller never sees complete result.

### Modifications

- Compactor calculates` from` and `to` position for compacted ledger 
according to last non-deleted active key.
- Compactor handles tail deleted keys from the ledger and completes 
compaction process gracefully.

### Result

compactor can successfully compact ledger whose last message is also 
deleted.
---
 .../pulsar/compaction/TwoPhaseCompactor.java   | 90 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++
 2 files changed, 111 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710..425e049 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class TwoPhaseCompactor extends Compactor {
 @Override
 protected CompletableFuture doCompaction(RawReader reader, 
BookKeeper bk) {
 return phaseOne(reader).thenCompose(
-(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
r.latestForKey, bk));
 }
 
 private CompletableFuture phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public class TwoPhaseCompactor extends Compactor {
 } else {
 log.info("Commencing phase one of compaction for {}, 
reading to {}",
  reader.getTopic(), lastMessageId);
-phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
+phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastMessageId, latestForKey,
+loopPromise);
 }
 });
 return loopPromise;
@@ -98,6 +99,7 @@ public class TwoPhaseCompactor extends Compactor {
 
 private void phaseOneLoop(RawReader reader,
   Optional firstMessageId,
+  Optional toMessageId,
   MessageId lastMessageId,
   Map latestForKey,
   CompletableFuture loopPromise) {
@@ -114,6 +116,7 @@ public class TwoPhaseCompactor extends Compactor {
 return;
 }
 MessageId id = m.getMessageId();
+boolean deletedMessage = false;
 if (RawBatchConverter.isReadableBatch(m)) {
 try {
 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ public class TwoPhaseCompactor extends Compactor {
 } else {
 Pair keyAndSize = 
extractKeyAndSize(m);
 if (keyAndSize != null) {
-latestForKey.put(keyAndSize.getLeft(), id);
+if(keyAndSize.getRight() > 0) {
+latestForKey.put(keyAndSize.getLeft(), 
id);
+} else {
+deletedMessage = true;
+latestForKey.remove(keyAndSize.getLeft());
+}
 }
 }
 
+MessageId first = firstMessageId.orElse(deletedMessage 
? null : id);
+MessageId to = deletedMessage ? 
toMessageId.orElse(null) : id;
 if (id.compareTo(lastMessageId) == 0) {
-loopPromise.complete(new 
PhaseOneResult(firstMessageId.orElse(id),
-id, 
latestForKey));
+loopPromise.complete(new PhaseOneResult(first, to, 
lastMessageId, 

[GitHub] sijie commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
sijie commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422334472
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: [java-client] Issue #2384: ConnectionHandler: Log stack trace instead of printing (#2599)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new c866279  [java-client] Issue #2384: ConnectionHandler: Log stack trace 
instead of printing (#2599)
c866279 is described below

commit c86627948d4abf4b50517338c739f0dea8029849
Author: Sijie Guo 
AuthorDate: Tue Sep 18 03:05:40 2018 -0700

[java-client] Issue #2384: ConnectionHandler: Log stack trace instead of 
printing (#2599)

*Motivation*

Fixes #2384.

The pulsar java client is currently very noisy in case of connection loss 
and it cannot be muted, as the stack trace is printed with .printStackTrace(). 
Moving this to the logging system will allow developers to configure this 
behavior.

*Changes*

Remove `printStackTrace`.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index d9bdc74..d04e8a8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -71,7 +71,6 @@ class ConnectionHandler {
 }
 
 private Void handleConnectionError(Throwable exception) {
-exception.printStackTrace();
 log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, 
state.getHandlerName(), exception.getMessage());
 connection.connectionFailed(new PulsarClientException(exception));
 



[GitHub] sijie closed issue #2384: [java-client] ConnectionHandler: Log stack trace instead of printing

2018-09-18 Thread GitBox
sijie closed issue #2384: [java-client] ConnectionHandler: Log stack trace 
instead of printing
URL: https://github.com/apache/incubator-pulsar/issues/2384
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2599: [java-client] Issue #2384: ConnectionHandler: Log stack trace instead of printing

2018-09-18 Thread GitBox
sijie closed pull request #2599: [java-client] Issue #2384: ConnectionHandler: 
Log stack trace instead of printing
URL: https://github.com/apache/incubator-pulsar/pull/2599
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index d9bdc74faa..d04e8a847a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -71,7 +71,6 @@ protected void grabCnx() {
 }
 
 private Void handleConnectionError(Throwable exception) {
-exception.printStackTrace();
 log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, 
state.getHandlerName(), exception.getMessage());
 connection.connectionFailed(new PulsarClientException(exception));
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…

2018-09-18 Thread GitBox
sijie commented on issue #2543: Add ServiceUrlProvider and add method 
forceCloseConnection in PulsarC…
URL: https://github.com/apache/incubator-pulsar/pull/2543#issuecomment-422321283
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: [tests] Flaky Test ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache (#2598)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c07c81   [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache (#2598)
6c07c81 is described below

commit 6c07c81bf85eaa6d2a23c026049c57305284876e
Author: Sijie Guo 
AuthorDate: Tue Sep 18 02:18:17 2018 -0700

 [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache (#2598)

* [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache

*Motivation*

The problem is zookeeper watcher notification is non-deterministic. That 
says if you create N paths in zookeeper, you might receive
x notification. x is between 1 and N. so the test is written in a very 
non-deterministic way.

```
java.lang.AssertionError: expected [1] but found [2]
at org.testng.Assert.fail(Assert.java:96)
at org.testng.Assert.failNotEquals(Assert.java:776)
at org.testng.Assert.assertEqualsImpl(Assert.java:137)
at org.testng.Assert.assertEquals(Assert.java:118)
at org.testng.Assert.assertEquals(Assert.java:652)
at org.testng.Assert.assertEquals(Assert.java:662)
at 
org.apache.pulsar.zookeeper.ZookeeperCacheTest.testChildrenCacheZnodeCreatedAfterCache(ZookeeperCacheTest.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at 
org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
at 
org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

*Changes*

rewrite the test to make it more robust.
---
 .../test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 09c3ea1..39f23ac 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -209,12 +209,14 @@ public class ZookeeperCacheTest {
 Thread.sleep(1);
 }
 
+final int recvNotifications = notificationCount.get();
+
 assertEquals(cache.get(), new 
TreeSet(Lists.newArrayList("z1")));
 assertEquals(cache.get("/test"), new 
TreeSet(Lists.newArrayList("z1")));
-assertEquals(notificationCount.get(), 1);
+assertTrue(recvNotifications == 1 || recvNotifications == 2);
 
 zkClient.delete("/test/z1", -1);
-while (notificationCount.get() < 2) {
+while (notificationCount.get() < (recvNotifications + 1)) {
 Thread.sleep(1);
 }
 
@@ -230,7 +232,7 @@ public class ZookeeperCacheTest {
 // Ok
 }
 
-assertEquals(notificationCount.get(), 2);
+assertEquals(notificationCount.get(), (recvNotifications + 1));
 }
 
 @Test(timeOut = 1)



[GitHub] sijie closed pull request #2598: [tests] Flaky Test ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache

2018-09-18 Thread GitBox
sijie closed pull request #2598:  [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache
URL: https://github.com/apache/incubator-pulsar/pull/2598
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 09c3ea1524..39f23ac59f 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -209,12 +209,14 @@ void testChildrenCacheZnodeCreatedAfterCache() throws 
Exception {
 Thread.sleep(1);
 }
 
+final int recvNotifications = notificationCount.get();
+
 assertEquals(cache.get(), new 
TreeSet(Lists.newArrayList("z1")));
 assertEquals(cache.get("/test"), new 
TreeSet(Lists.newArrayList("z1")));
-assertEquals(notificationCount.get(), 1);
+assertTrue(recvNotifications == 1 || recvNotifications == 2);
 
 zkClient.delete("/test/z1", -1);
-while (notificationCount.get() < 2) {
+while (notificationCount.get() < (recvNotifications + 1)) {
 Thread.sleep(1);
 }
 
@@ -230,7 +232,7 @@ void testChildrenCacheZnodeCreatedAfterCache() throws 
Exception {
 // Ok
 }
 
-assertEquals(notificationCount.get(), 2);
+assertEquals(notificationCount.get(), (recvNotifications + 1));
 }
 
 @Test(timeOut = 1)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch asf-site updated: Updated site at revision aebfbba

2018-09-18 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 371973f  Updated site at revision aebfbba
371973f is described below

commit 371973f2f2d97154c06ff5818911056c13af11d4
Author: jenkins 
AuthorDate: Tue Sep 18 08:43:51 2018 +

Updated site at revision aebfbba
---
 .../docs/latest/adaptors/PulsarSpark/index.html|  8 ++---
 .../docs/latest/adaptors/PulsarStorm/index.html|  6 ++--
 content/docs/latest/admin-api/overview/index.html  | 10 +++---
 content/docs/latest/clients/Cpp/index.html |  8 ++---
 content/docs/latest/clients/Java/index.html| 14 
 content/docs/latest/clients/Python/index.html  | 10 +++---
 content/docs/latest/clients/WebSocket/index.html   |  8 ++---
 content/docs/latest/clients/go/index.html  |  6 ++--
 .../docs/latest/cookbooks/Encryption/index.html|  6 ++--
 .../latest/cookbooks/PartitionedTopics/index.html  | 14 
 .../latest/cookbooks/RetentionExpiry/index.html| 14 
 .../docs/latest/cookbooks/compaction/index.html|  6 ++--
 .../cookbooks/message-deduplication/index.html | 10 +++---
 .../docs/latest/cookbooks/message-queue/index.html | 14 
 .../latest/cookbooks/tiered-storage/index.html |  4 +--
 .../docs/latest/deployment/Kubernetes/index.html   |  4 +--
 .../docs/latest/deployment/aws-cluster/index.html  |  4 +--
 content/docs/latest/deployment/cluster/index.html  |  4 +--
 content/docs/latest/deployment/instance/index.html |  4 +--
 .../ConceptsAndArchitecture/index.html |  4 +--
 .../latest/getting-started/LocalCluster/index.html |  4 +--
 .../latest/getting-started/Pulsar-2.0/index.html   |  2 +-
 .../docs/latest/getting-started/docker/index.html  |  4 +--
 .../docs/latest/project/BinaryProtocol/index.html  |  4 +--
 content/docs/latest/project/CompileCpp/index.html  |  8 ++---
 .../docs/latest/project/SimulationTools/index.html |  2 +-
 .../docs/latest/project/schema-storage/index.html  |  4 +--
 content/docs/latest/reference/CliTools/index.html  | 18 +-
 .../docs/latest/security/authorization/index.html  | 10 +++---
 content/docs/latest/security/encryption/index.html |  6 ++--
 content/img/docusaurus.svg |  2 +-
 content/img/pulsar.svg |  2 +-
 content/ja/adaptors/PulsarSpark/index.html |  8 ++---
 content/ja/adaptors/PulsarStorm/index.html |  6 ++--
 content/ja/admin/AdminInterface/index.html | 12 +++
 content/ja/admin/Authz/index.html  | 12 +++
 content/ja/admin/ClustersBrokers/index.html|  6 ++--
 content/ja/admin/PropertiesNamespaces/index.html   |  6 ++--
 content/ja/advanced/PartitionedTopics/index.html   | 12 +++
 content/ja/advanced/RetentionExpiry/index.html | 12 +++
 content/ja/clients/Cpp/index.html  |  6 ++--
 content/ja/clients/Java/index.html |  8 ++---
 content/ja/clients/Python/index.html   |  8 ++---
 content/ja/clients/WebSocket/index.html|  8 ++---
 content/ja/deployment/InstanceSetup/index.html |  6 ++--
 content/ja/deployment/Kubernetes/index.html|  4 +--
 .../ConceptsAndArchitecture/index.html |  2 +-
 content/ja/getting-started/LocalCluster/index.html |  4 +--
 content/ja/project/BinaryProtocol/index.html   |  4 +--
 content/ja/project/SimulationTools/index.html  |  2 +-
 content/ja/reference/CliTools/index.html   | 18 +-
 content/swagger/swagger.json   | 40 +++---
 52 files changed, 204 insertions(+), 204 deletions(-)

diff --git a/content/docs/latest/adaptors/PulsarSpark/index.html 
b/content/docs/latest/adaptors/PulsarSpark/index.html
index a30d928..d130108 100644
--- a/content/docs/latest/adaptors/PulsarSpark/index.html
+++ b/content/docs/latest/adaptors/PulsarSpark/index.html
@@ -1079,9 +1079,9 @@
   
   
   
+  Spark Streaming 
Pulsar receiver
   
   
-  Spark Streaming 
Pulsar receiver
   
   
   
@@ -1325,9 +1325,9 @@
   
   
   
+  Spark Streaming 
Pulsar receiver
   
   
-  Spark Streaming 
Pulsar receiver
   
   
   
@@ -1537,8 +1537,6 @@
   
   
   
-  
-  
   The Pulsar Java 
client
   
   
@@ -1617,6 +1615,8 @@
   
   
   
+  
+  
   Authentication 
and authorization in Pulsar
   
   
diff --git a/content/docs/latest/adaptors/PulsarStorm/index.html 
b/content/docs/latest/adaptors/PulsarStorm/index.html
index 3059cf5..67a4256 100644
--- 

[GitHub] sijie commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
sijie commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422290633
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2587: [tests] improve connector related integration tests

2018-09-18 Thread GitBox
sijie commented on issue #2587: [tests] improve connector related integration 
tests
URL: https://github.com/apache/incubator-pulsar/pull/2587#issuecomment-422290835
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2598: [tests] Flaky Test ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache

2018-09-18 Thread GitBox
sijie commented on issue #2598:  [tests] Flaky Test 
ZooKeeperCacheTest#testChildrenCacheZnodeCreatedAfterCache
URL: https://github.com/apache/incubator-pulsar/pull/2598#issuecomment-422290404
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2599: [java-client] Issue #2384: ConnectionHandler: Log stack trace instead of printing

2018-09-18 Thread GitBox
sijie commented on issue #2599: [java-client] Issue #2384: ConnectionHandler: 
Log stack trace instead of printing
URL: https://github.com/apache/incubator-pulsar/pull/2599#issuecomment-422289737
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: [tests] make PersistentQueueE2ETest tolerant failures during deleting topic at cleanup (#2600)

2018-09-18 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new aebfbba  [tests] make PersistentQueueE2ETest tolerant failures during 
deleting topic at cleanup (#2600)
aebfbba is described below

commit aebfbba2a9f2882944bd1854672f4538a9d27177
Author: Sijie Guo 
AuthorDate: Tue Sep 18 00:43:50 2018 -0700

[tests] make PersistentQueueE2ETest tolerant failures during deleting topic 
at cleanup (#2600)

*Motivation*

"Topic has active producers/subscriptions" can be thrown during deleting 
topic at cleanup.
This kind of error can be ignored during cleaning up to make the tests more 
robust.

*Changes*

Make PersistentQueueE2ETest toelrant failures during cleaning up topics.
---
 .../broker/service/PersistentQueueE2ETest.java | 23 +-
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index c41a07a..2937ca0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
@@ -76,6 +77,14 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 
 private static final Logger log = 
LoggerFactory.getLogger(PersistentQueueE2ETest.class);
 
+private void deleteTopic(String topicName) {
+try {
+admin.topics().delete(topicName);
+} catch (PulsarAdminException pae) {
+// it is okay to get exception if it is cleaning up.
+}
+}
+
 @Test
 public void testSimpleConsumerEvents() throws Exception {
 final String topicName = "persistent://prop/use/ns-abc/shared-topic1";
@@ -172,7 +181,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 producer.close();
 consumer2.close();
 
-admin.topics().delete(topicName);
+deleteTopic(topicName);
 }
 
 @Test
@@ -220,7 +229,8 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 assertTrue(CollectionUtils.subtract(messagesProduced, 
messagesConsumed).isEmpty());
 
 consumer1.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 @Test
@@ -279,7 +289,8 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 
 consumer1.close();
 consumer2.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
@@ -352,7 +363,8 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 consumer1.close();
 consumer2.close();
 consumer3.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 @Test(timeOut = 30)
@@ -550,6 +562,7 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
 producer.close();
 consumer1.close();
 consumer2.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 }



[GitHub] sijie closed pull request #2600: [tests] make PersistentQueueE2ETest tolerant failures during deleting topic at cleanup

2018-09-18 Thread GitBox
sijie closed pull request #2600: [tests] make PersistentQueueE2ETest tolerant 
failures during deleting topic at cleanup
URL: https://github.com/apache/incubator-pulsar/pull/2600
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index c41a07a2cb..2937ca0caf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -35,6 +35,7 @@
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
@@ -76,6 +77,14 @@ protected void cleanup() throws Exception {
 
 private static final Logger log = 
LoggerFactory.getLogger(PersistentQueueE2ETest.class);
 
+private void deleteTopic(String topicName) {
+try {
+admin.topics().delete(topicName);
+} catch (PulsarAdminException pae) {
+// it is okay to get exception if it is cleaning up.
+}
+}
+
 @Test
 public void testSimpleConsumerEvents() throws Exception {
 final String topicName = "persistent://prop/use/ns-abc/shared-topic1";
@@ -172,7 +181,7 @@ public void testSimpleConsumerEvents() throws Exception {
 producer.close();
 consumer2.close();
 
-admin.topics().delete(topicName);
+deleteTopic(topicName);
 }
 
 @Test
@@ -220,7 +229,8 @@ public void testReplayOnConsumerDisconnect() throws 
Exception {
 assertTrue(CollectionUtils.subtract(messagesProduced, 
messagesConsumed).isEmpty());
 
 consumer1.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 @Test
@@ -279,7 +289,8 @@ public void testConsumersWithDifferentPermits() throws 
Exception {
 
 consumer1.close();
 consumer2.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
@@ -352,7 +363,8 @@ public void testRoundRobinBatchDistribution() throws 
Exception {
 consumer1.close();
 consumer2.close();
 consumer3.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 
 @Test(timeOut = 30)
@@ -550,6 +562,7 @@ public void testUnackedCountWithRedeliveries() throws 
Exception {
 producer.close();
 consumer1.close();
 consumer2.close();
-admin.topics().delete(topicName);
+
+deleteTopic(topicName);
 }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422284349
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2603: Share EventLoopGroup between Broker and BK client

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2603: Share EventLoopGroup 
between Broker and BK client
URL: https://github.com/apache/incubator-pulsar/pull/2603#discussion_r218323237
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
 ##
 @@ -28,6 +30,6 @@
  * Provider of a new BookKeeper client instance
  */
 public interface BookKeeperClientFactory {
-BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws 
IOException;
+BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, 
EventLoopGroup eventLoopGroup) throws IOException;
 
 Review comment:
   yes, this will help in avoiding context switching while publish and 
dispatching messages. however, do you see contention issue where high 
dispatching can impact publish?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services