[GitHub] merlimat closed pull request #2572: Shorten the timeout value of C++ ZTS client

2018-09-19 Thread GitBox
merlimat closed pull request #2572: Shorten the timeout value of C++ ZTS client
URL: https://github.com/apache/incubator-pulsar/pull/2572
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc 
b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 93010affb8..3097c30c65 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -45,7 +45,7 @@ namespace pulsar {
 
 const static std::string DEFAULT_PRINCIPAL_HEADER = "Athenz-Principal-Auth";
 const static std::string DEFAULT_ROLE_HEADER = "Athenz-Role-Auth";
-const static int REQUEST_TIMEOUT = 1;
+const static int REQUEST_TIMEOUT = 3;
 const static int DEFAULT_TOKEN_EXPIRATION_TIME_SEC = 3600;
 const static int MIN_TOKEN_EXPIRATION_TIME_SEC = 900;
 const static int MAX_HTTP_REDIRECTS = 20;
@@ -272,7 +272,7 @@ const std::string ZTSClient::getRoleToken() const {
 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
 
 // Timer
-curl_easy_setopt(handle, CURLOPT_TIMEOUT, REQUEST_TIMEOUT);
+curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, REQUEST_TIMEOUT);
 
 // Redirects
 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] david-streamlio opened a new pull request #2615: Azure offloader

2018-09-19 Thread GitBox
david-streamlio opened a new pull request #2615: Azure offloader
URL: https://github.com/apache/incubator-pulsar/pull/2615
 
 
   ### Motivation
   
   Added Azure offloader for Pulsar Tiered Storage
   
   ### Modifications
   
   Refactored the tiered storage configuration tree to make it easier to add 
other JCloud Storage Providers in the future
   
   ### Result
   
   Users will be able to use their Azure Storage accounts for tiered storage of 
Pulsar data.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai opened a new pull request #2614: Debezium: add PulsarDatabaseHistory for debezium

2018-09-19 Thread GitBox
jiazhai opened a new pull request #2614: Debezium: add PulsarDatabaseHistory 
for debezium
URL: https://github.com/apache/incubator-pulsar/pull/2614
 
 
   ### Motivation
   
   add PulsarDatabaseHistory for debezium
   
   ### Modifications
   
   add PulsarDatabaseHistory for debezium and test for it.
   
   ### Result
   
   ut pass


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2608: Enforce boost-python was found by CMake

2018-09-19 Thread GitBox
merlimat closed pull request #2608: Enforce boost-python was found by CMake
URL: https://github.com/apache/incubator-pulsar/pull/2608
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/docker/build-wheels.sh 
b/pulsar-client-cpp/docker/build-wheels.sh
index ebbb441cac..32925194ec 100755
--- a/pulsar-client-cpp/docker/build-wheels.sh
+++ b/pulsar-client-cpp/docker/build-wheels.sh
@@ -33,6 +33,7 @@ PYTHON_VERSIONS=(
'3.4 cp34-cp34m'
'3.5 cp35-cp35m'
'3.6 cp36-cp36m'
+   '3.7 cp37-cp37m'
 )
 
 function contains() {
diff --git a/pulsar-client-cpp/python/CMakeLists.txt 
b/pulsar-client-cpp/python/CMakeLists.txt
index 9ddbc017c1..2c51f6d0b0 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -48,8 +48,17 @@ endif()
 set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY} ${Boost_PYTHON3_LIBRARY}
 ${Boost_PYTHON27-MT_LIBRARY} 
${Boost_PYTHON37-MT_LIBRARY})
 
+if (APPLE)
+set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS}
+ ${Boost_PYTHON27-MT_LIBRARY_RELEASE} 
${Boost_PYTHON37-MT_LIBRARY_RELEASE})
+endif()
+
 message(STATUS "Using Boost Python libs: ${PYTHON_WRAPPER_LIBS}")
 
+if (NOT PYTHON_WRAPPER_LIBS)
+MESSAGE(FATAL_ERROR "Could not find Boost Python library")
+endif ()
+
 if (APPLE)
 target_link_libraries(_pulsar -Wl,-all_load pulsarStatic 
${PYTHON_WRAPPER_LIBS})
 else ()


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia opened a new pull request #2613: Avoid scheduling heartbeat function if owner-worker not available

2018-09-19 Thread GitBox
rdhabalia opened a new pull request #2613: Avoid scheduling heartbeat function 
if owner-worker not available
URL: https://github.com/apache/incubator-pulsar/pull/2613
 
 
   ### Motivation
   
   Handle heartbeat function if owner-worker is not available. 
   
   ### Modifications
   
   Function scheduling handles heartbeat function if owner-worker is not 
available.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on issue #2590: Issue #2584: unacked message is not redelivered on time

2018-09-19 Thread GitBox
jiazhai commented on issue #2590: Issue #2584: unacked message is not 
redelivered on time
URL: https://github.com/apache/incubator-pulsar/pull/2590#issuecomment-422994297
 
 
   seems some ut need change, will look into it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai removed a comment on issue #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
jiazhai removed a comment on issue #2606: Add support for dead letter topics 
for java functions
URL: https://github.com/apache/incubator-pulsar/pull/2606#issuecomment-422994155
 
 
   Seems some ut need change, will look into it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on issue #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
jiazhai commented on issue #2606: Add support for dead letter topics for java 
functions
URL: https://github.com/apache/incubator-pulsar/pull/2606#issuecomment-422994155
 
 
   Seems some ut need change, will look into it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422990454
 
 
   rerun integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
jerrypeng commented on a change in pull request #2593: Add support for running 
python functions with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r218982622
 
 

 ##
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 ##
 @@ -229,6 +229,11 @@ void processArguments() throws Exception {
 description = "Path to the main Python file for the function 
(if the function is written in Python)",
 listConverter = StringConverter.class)
 protected String pyFile;
+@Parameter(
+names = "--pywheel",
+description = "Path to the Python wheel file for the function 
(if the function is submitted as Python Wheel)",
+listConverter = StringConverter.class)
 
 Review comment:
   why do we need a listConverter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2007: Implement an easier way to get the topic information from consumed message

2018-09-19 Thread GitBox
sijie closed issue #2007: Implement an easier way to get the topic information 
from consumed message
URL: https://github.com/apache/incubator-pulsar/issues/2007
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2007: Implement an easier way to get the topic information from consumed message

2018-09-19 Thread GitBox
sijie commented on issue #2007: Implement an easier way to get the topic 
information from consumed message
URL: 
https://github.com/apache/incubator-pulsar/issues/2007#issuecomment-422970257
 
 
   This has been implemented and merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2009: Unable to get messages from non-persistent topics

2018-09-19 Thread GitBox
sijie commented on issue #2009: Unable to get messages from non-persistent 
topics
URL: 
https://github.com/apache/incubator-pulsar/issues/2009#issuecomment-422970028
 
 
   This is being fixed by #2025, will be included in  the upcoming release.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dsambandam commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1

2018-09-19 Thread GitBox
dsambandam commented on issue #2576: Pulsar client consume command fails with 
java.lang.IllegalArgumentException: port out of range:-1
URL: 
https://github.com/apache/incubator-pulsar/issues/2576#issuecomment-422969989
 
 
   @sijie I don't have any special character in my service urls. In that case 
my command that runs fine with no authentication enabled should also fail right 
? That gets completed successfully with no issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2013: Unable to consume messages from a partition

2018-09-19 Thread GitBox
sijie commented on issue #2013: Unable to consume messages from a partition
URL: 
https://github.com/apache/incubator-pulsar/issues/2013#issuecomment-422969773
 
 
   I think this is a duplicate of #2431 , closing this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2013: Unable to consume messages from a partition

2018-09-19 Thread GitBox
sijie closed issue #2013: Unable to consume messages from a partition
URL: https://github.com/apache/incubator-pulsar/issues/2013
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2033: Python pulsar-client lacks multi-topic support

2018-09-19 Thread GitBox
sijie commented on issue #2033: Python pulsar-client lacks multi-topic support
URL: 
https://github.com/apache/incubator-pulsar/issues/2033#issuecomment-422969364
 
 
   multi-topic and regex subscription support is added to python. it is 
actually released in 2.1.1 and will also be in 2.2.0


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2033: Python pulsar-client lacks multi-topic support

2018-09-19 Thread GitBox
sijie closed issue #2033: Python pulsar-client lacks multi-topic support
URL: https://github.com/apache/incubator-pulsar/issues/2033
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener

2018-09-19 Thread GitBox
sijie commented on issue #2574: Timeout message not get redeliver in 
TopicsConsumer when use message listener
URL: 
https://github.com/apache/incubator-pulsar/issues/2574#issuecomment-422968685
 
 
   This is fixed by #2583 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1

2018-09-19 Thread GitBox
sijie commented on issue #2576: Pulsar client consume command fails with 
java.lang.IllegalArgumentException: port out of range:-1
URL: 
https://github.com/apache/incubator-pulsar/issues/2576#issuecomment-422968544
 
 
   @dsambandam I think the exception is quite similar ast the issue reported 
here: #1022 can you inspect your tls related service urls and make sure they 
don't have any special characters at the end of the hostname?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener

2018-09-19 Thread GitBox
sijie closed issue #2574: Timeout message not get redeliver in TopicsConsumer 
when use message listener
URL: https://github.com/apache/incubator-pulsar/issues/2574
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #1016: Support bytes message key

2018-09-19 Thread GitBox
sijie commented on issue #1016: Support bytes message key
URL: 
https://github.com/apache/incubator-pulsar/issues/1016#issuecomment-422966131
 
 
   this is being implemented in #2612 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422965788
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2504: Add Presto Sql Test

2018-09-19 Thread GitBox
aahmed-se commented on issue #2504: Add Presto Sql Test
URL: https://github.com/apache/incubator-pulsar/pull/2504#issuecomment-422963308
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2578: Add support for schema extraction from a jar

2018-09-19 Thread GitBox
aahmed-se commented on issue #2578: Add support for schema extraction from a jar
URL: https://github.com/apache/incubator-pulsar/pull/2578#issuecomment-422963330
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2611: Make dockerUtils use container name for test exec

2018-09-19 Thread GitBox
aahmed-se commented on issue #2611: Make dockerUtils use container name for 
test exec
URL: https://github.com/apache/incubator-pulsar/pull/2611#issuecomment-422962898
 
 
   the log configure does not, the DockerUtils changes fixes that issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2611: Make dockerUtils use container name for test exec

2018-09-19 Thread GitBox
sijie commented on issue #2611: Make dockerUtils use container name for test 
exec
URL: https://github.com/apache/incubator-pulsar/pull/2611#issuecomment-422954395
 
 
   >  we are reducing the number of log line elements since we are not looking 
at multithreaded server components but single threaded test instances
   
   well, I think it is debatable. more information is generally helping with 
debugging things. is there any particular reasons that you have to reduces the 
log line elements?
   
   > also we are making the log files easier to follow bu using the 
containername
   
   how does the log configure help that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2612: Allow byte[] keys for messages (#1016)

2018-09-19 Thread GitBox
sijie commented on a change in pull request #2612: Allow byte[] keys for 
messages (#1016)
URL: https://github.com/apache/incubator-pulsar/pull/2612#discussion_r218956364
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
 ##
 @@ -89,6 +89,14 @@
  */
 TypedMessageBuilder key(String key);
 
+/**
+ * Sets the bytes of the key of the message for routing policy.
+ * Internally the bytes will be base64 encoded.
+ *
+ * @param key routing key for message, in byte array form
+ */
+TypedMessageBuilder keyBytes(byte[] key);
 
 Review comment:
   any particular reason on naming this `keyBytes` instead of `key`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422942590
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kramasamy commented on a change in pull request #2609: Initial commit for a go based pulsar cli.

2018-09-19 Thread GitBox
kramasamy commented on a change in pull request #2609: Initial commit for a go 
based pulsar cli.
URL: https://github.com/apache/incubator-pulsar/pull/2609#discussion_r218946596
 
 

 ##
 File path: cli/admin/clusters.go
 ##
 @@ -0,0 +1,62 @@
+//
 
 Review comment:
   since the cluster create/delete/modify is used mainly for replication - it 
might be good to create a replication command group and put these underneath.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422936960
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly opened a new pull request #2612: Allow byte[] keys for messages (#1016)

2018-09-19 Thread GitBox
ivankelly opened a new pull request #2612: Allow byte[] keys for messages 
(#1016)
URL: https://github.com/apache/incubator-pulsar/pull/2612
 
 
   Sometimes it can be useful to send something more complex than a
   string as the key of the message. However, early on Pulsar chose to
   make String the only way to send a key, and this permeates throughout
   the code, so we can't very well change it now.
   
   This patch adds rudimentary byte[] key support. If a user adds a
   byte[] key, the byte[] is base64 encoded and stored in the normal key
   field. We also send a flag to denote that it is base64 encoded, so the
   receiving end knows to decode it correctly. There's no schema or
   anything attached to this. Any SerDe has to be handled manually by the
   client.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new issue #1016: Support bytes message key

2018-09-19 Thread GitBox
sijie opened a new issue #1016: Support bytes message key
URL: https://github.com/apache/incubator-pulsar/issues/1016
 
 
   Currently the message key is string. It would be good to make the key also 
be bytes backed. so people can configure a SerDe on the message key, which 
message router can use to compute which partitions a message should be routed 
to.
   
   It should be considered together with schema registry.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2611: Make dockerUtils use container name for test exec

2018-09-19 Thread GitBox
aahmed-se commented on issue #2611: Make dockerUtils use container name for 
test exec
URL: https://github.com/apache/incubator-pulsar/pull/2611#issuecomment-422930867
 
 
   We are overriding  that , we are reducing the number of log line elements 
since we are not looking at multithreaded server components but single threaded 
test instances, also we are making the log files easier to follow bu using the 
containername


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2611: Make dockerUtils use container name for test exec

2018-09-19 Thread GitBox
sijie commented on a change in pull request #2611: Make dockerUtils use 
container name for test exec
URL: https://github.com/apache/incubator-pulsar/pull/2611#discussion_r218935033
 
 

 ##
 File path: tests/integration/src/test/resources/log4j2.xml
 ##
 @@ -0,0 +1,33 @@
+
 
 Review comment:
   why do we need this?
   
   the logging is already configured - 
https://github.com/apache/incubator-pulsar/blob/master/buildtools/src/main/resources/log4j2.xml
   
   you can enable logging to console by specifying 
`-DredirectTestOutputToFile=false`
   
   https://github.com/apache/incubator-pulsar/blob/master/pom.xml#L132
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se opened a new pull request #2611: Make dockerUtils use container name for test exec

2018-09-19 Thread GitBox
aahmed-se opened a new pull request #2611: Make dockerUtils use container name 
for test exec
URL: https://github.com/apache/incubator-pulsar/pull/2611
 
 
   Provides easier to follow logs also we reduce the log output via custom log 
config for the itegration tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422919735
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422915657
 
 
   @rdhabalia I will write some tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2607: Add default value for test image org tag

2018-09-19 Thread GitBox
aahmed-se commented on issue #2607: Add default value for test image org tag
URL: https://github.com/apache/incubator-pulsar/pull/2607#issuecomment-422913594
 
 
   you are right will close this


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se closed pull request #2607: Add default value for test image org tag

2018-09-19 Thread GitBox
aahmed-se closed pull request #2607: Add default value for test image org tag
URL: https://github.com/apache/incubator-pulsar/pull/2607
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/docker-images/latest-version-image/pom.xml 
b/tests/docker-images/latest-version-image/pom.xml
index b58530ce44..d484972a9a 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -29,6 +29,9 @@
   org.apache.pulsar.tests
   latest-version-image
   Apache Pulsar :: Tests :: Docker Images :: Latest Version 
Testing
+  
+apachepulsar
+  
   pom
 
   


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2610: Prepare website for future releases without '-incubating' suffix

2018-09-19 Thread GitBox
merlimat opened a new pull request #2610: Prepare website for future releases 
without '-incubating' suffix
URL: https://github.com/apache/incubator-pulsar/pull/2610
 
 
   ### Motivation
   
   The `-incubating` suffix is a property of a particular release. We shouldn't 
add it to all releases in the download page. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2609: Initial commit for a go based pulsar cli.

2018-09-19 Thread GitBox
sijie commented on issue #2609: Initial commit for a go based pulsar cli.
URL: https://github.com/apache/incubator-pulsar/pull/2609#issuecomment-422902467
 
 
   @cckellogg ideally yes. but `mvn license:format` should already attaching 
the license headers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cckellogg commented on issue #2609: Initial commit for a go based pulsar cli.

2018-09-19 Thread GitBox
cckellogg commented on issue #2609: Initial commit for a go based pulsar cli.
URL: https://github.com/apache/incubator-pulsar/pull/2609#issuecomment-422901452
 
 
   @sijie does a license need to be added to the source files only? Do the mod 
files and readme need a license too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng removed a comment on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng removed a comment on issue #2605: implement topic routing on a per 
record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897646
 
 
   rerun tests please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422898762
 
 
   rerun integration tests 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422898627
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897997
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897646
 
 
   rerun tests please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897747
 
 
   rerun cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng removed a comment on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng removed a comment on issue #2605: implement topic routing on a per 
record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897519
 
 
   rerun tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422897519
 
 
   rerun tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218900232
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,153 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
 
 Review comment:
   yes because a message can have any destination topic.  At the time of 
processing, we may or maybe not have already created a producer for that topic. 
 We cache producers, so that we don't create a producer for each message and 
try to reuse existing ones.  This is basically the same logic for 
MultiConsumerProducer used for effectively once where we have to have separate 
producers for each partition


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422895917
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218900232
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,153 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
 
 Review comment:
   yes because a message can have a any destination topic.  At the time of 
processing, we may or maybe not have already created a producer for that topic. 
 We cache producers, so that we don't create a producer for each message and 
try to reuse existing ones.  This is basically the same logic for 
MultiConsumerProducer used for effectively once where we have to have separate 
producers for each partition


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng edited a comment on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng edited a comment on issue #2605: implement topic routing on a per 
record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422892352
 
 
   @rdhabalia a usecase we have is will a source for ingesting data from 
databases.  Users will often want to just specify schema or db and have the 
data of all the tables underneath automatically be sent to corresponding topics 
in Pulsar.  Thus, at the end of the day we will have table -> topic mapping.  
For a use case as such, when its too cumbersome to explicitly specify some sort 
of mapping to topics and also for use cases  when you don't know which topic to 
sent until runtime or upon message inspection


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-19 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422892352
 
 
   @rdhabalia a use we have is will a source for ingesting data from databases. 
 Users will often want to just specify schema or db and have the data of all 
the tables underneath automatically be sent to corresponding topics in Pulsar.  
Thus, at the end of the day we will have table -> topic mapping.  For a use 
case as such, when its too cumbersome to explicitly specify some sort of 
mapping to topics and also for use cases  when you don't know which topic to 
sent until runtime or upon message inspection


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one

2018-09-19 Thread GitBox
merlimat closed pull request #2580: [Python] Consolidated duplicated 
subscribe_*() methods into a single one
URL: https://github.com/apache/incubator-pulsar/pull/2580
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 806c7e2032..6849ecc245 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -104,6 +104,9 @@ def send_callback(res, msg):
 from pulsar.functions.context import Context
 from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
 
+import re
+_retype = type(re.compile('x'))
+
 class MessageId:
 """
 Represents a message id
@@ -412,114 +415,19 @@ def subscribe(self, topic, subscription_name,
   unacked_messages_timeout_ms=None,
   broker_consumer_stats_cache_time_ms=3,
   is_read_compacted=False,
-  properties=None
+  properties=None,
+  pattern_auto_discovery_period=60
   ):
 """
 Subscribe to the given topic and subscription combination.
 
 **Args**
 
-* `topic`: The name of the topic.
-* `subscription`: The name of the subscription.
-
-**Options**
-
-* `consumer_type`:
-  Select the subscription type to be used when subscribing to the 
topic.
-* `message_listener`:
-  Sets a message listener for the consumer. When the listener is set,
-  the application will receive messages through it. Calls to
-  `consumer.receive()` will not be allowed. The listener function needs
-  to accept (consumer, message), for example:
-
-#!python
-def my_listener(consumer, message):
-# process message
-consumer.acknowledge(message)
-
-* `receiver_queue_size`:
-  Sets the size of the consumer receive queue. The consumer receive
-  queue controls how many messages can be accumulated by the consumer
-  before the application calls `receive()`. Using a higher value could
-  potentially increase the consumer throughput at the expense of higher
-  memory utilization. Setting the consumer queue size to zero decreases
-  the throughput of the consumer by disabling pre-fetching of messages.
-  This approach improves the message distribution on shared 
subscription
-  by pushing messages only to those consumers that are ready to process
-  them. Neither receive with timeout nor partitioned topics can be used
-  if the consumer queue size is zero. The `receive()` function call
-  should not be interrupted when the consumer queue size is zero. The
-  default value is 1000 messages and should work well for most use
-  cases.
-* `max_total_receiver_queue_size_across_partitions`
-  Set the max total receiver queue size across partitions.
-  This setting will be used to reduce the receiver queue size for 
individual partitions
-* `consumer_name`:
-  Sets the consumer name.
-* `unacked_messages_timeout_ms`:
-  Sets the timeout in milliseconds for unacknowledged messages. The
-  timeout needs to be greater than 10 seconds. An exception is thrown 
if
-  the given value is less than 10 seconds. If a successful
-  acknowledgement is not sent within the timeout, all the 
unacknowledged
-  messages are redelivered.
-* `broker_consumer_stats_cache_time_ms`:
-  Sets the time duration for which the broker-side consumer stats will
-  be cached in the client.
-* `properties`:
-  Sets the properties for the consumer. The properties associated with 
a consumer
-  can be used for identify a consumer at broker side.
-"""
-_check_type(str, topic, 'topic')
-_check_type(str, subscription_name, 'subscription_name')
-_check_type(ConsumerType, consumer_type, 'consumer_type')
-_check_type(int, receiver_queue_size, 'receiver_queue_size')
-_check_type(int, max_total_receiver_queue_size_across_partitions,
-'max_total_receiver_queue_size_across_partitions')
-_check_type_or_none(str, consumer_name, 'consumer_name')
-_check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
-_check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
-_check_type(bool, is_read_compacted, 'is_read_compacted')
-_check_type_or_none(dict, properties, 'properties')
-
-

[GitHub] sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic

2018-09-19 Thread GitBox
sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest 
testing behavior deterministic
URL: https://github.com/apache/incubator-pulsar/pull/2585#issuecomment-422882486
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2590: Issue #2584: unacked message is not redelivered on time

2018-09-19 Thread GitBox
sijie commented on issue #2590: Issue #2584: unacked message is not redelivered 
on time
URL: https://github.com/apache/incubator-pulsar/pull/2590#issuecomment-422882307
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2594: Correcting SQL getting started title

2018-09-19 Thread GitBox
sijie closed pull request #2594: Correcting SQL getting started title
URL: https://github.com/apache/incubator-pulsar/pull/2594
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni closed pull request #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
srkukarni closed pull request #2606: Add support for dead letter topics for 
java functions
URL: https://github.com/apache/incubator-pulsar/pull/2606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e8c874059c..5284b59f5b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,7 +24,7 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -45,7 +45,6 @@
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
@@ -79,12 +78,12 @@
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
+import org.apache.pulsar.functions.proto.Function.RetryDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
@@ -321,6 +320,10 @@ void processArguments() throws Exception {
 protected Long DEPRECATED_timeoutMs;
 @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
 protected Long timeoutMs;
+@Parameter(names = "--max-message-retries", description = "How many 
times should we try to process a message before giving up")
+protected Integer maxMessageRetries = -1;
+@Parameter(names = "--dead-letter-topic", description = "The topic 
where all messages which could not be processed successfully are sent")
+protected String deadLetterTopic;
 protected FunctionConfig functionConfig;
 protected String userCodeFile;
 
@@ -464,6 +467,13 @@ void processArguments() throws Exception {
 
 functionConfig.setAutoAck(autoAck);
 
+if (null != maxMessageRetries) {
+functionConfig.setMaxMessageRetries(maxMessageRetries);
+}
+if (null != deadLetterTopic) {
+functionConfig.setDeadLetterTopic(deadLetterTopic);
+}
+
 if (null != jarFile) {
 functionConfig.setJar(jarFile);
 }
@@ -717,6 +727,15 @@ protected FunctionDetails convert(FunctionConfig 
functionConfig)
 
Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
 }
 
+if (functionConfig.getMaxMessageRetries() >= 0) {
+RetryDetails.Builder retryBuilder = RetryDetails.newBuilder();
+
retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
+
retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+}
+functionDetailsBuilder.setRetryDetails(retryBuilder);
+}
+
 Map configs = new HashMap<>();
 configs.putAll(functionConfig.getUserConfig());
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b3f86eae0e..1e07516f59 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -547,6 +547,11 @@ public void setupInput(ContextImpl contextImpl) throws 
Exception {
 

[GitHub] sijie commented on issue #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
sijie commented on issue #2606: Add support for dead letter topics for java 
functions
URL: https://github.com/apache/incubator-pulsar/pull/2606#issuecomment-422843947
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2594: Correcting SQL getting started title

2018-09-19 Thread GitBox
sijie commented on issue #2594: Correcting SQL getting started title
URL: https://github.com/apache/incubator-pulsar/pull/2594#issuecomment-422843819
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam removed a comment on issue #2572: Shorten the timeout value of C++ ZTS client

2018-09-19 Thread GitBox
massakam removed a comment on issue #2572: Shorten the timeout value of C++ ZTS 
client
URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-422764843
 
 
   rerun integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client

2018-09-19 Thread GitBox
massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client
URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-422816585
 
 
   rerun integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file

2018-09-19 Thread GitBox
srkukarni commented on issue #2593: Add support for running python functions 
with wheel file
URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-422807920
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
srkukarni commented on issue #2606: Add support for dead letter topics for java 
functions
URL: https://github.com/apache/incubator-pulsar/pull/2606#issuecomment-422807563
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client

2018-09-19 Thread GitBox
massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client
URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-422764843
 
 
   rerun integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…

2018-09-19 Thread GitBox
sijie commented on issue #2543: Add ServiceUrlProvider and add method 
forceCloseConnection in PulsarC…
URL: https://github.com/apache/incubator-pulsar/pull/2543#issuecomment-422704688
 
 
   @codelipenghui yeah, thank you so much for your contribution! look forward 
to your contribution about zookeeper based service url provider :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic

2018-09-19 Thread GitBox
sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest 
testing behavior deterministic
URL: https://github.com/apache/incubator-pulsar/pull/2585#issuecomment-422678548
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client

2018-09-19 Thread GitBox
massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client
URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-422675074
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2586: fixing/improving logging for function instance

2018-09-19 Thread GitBox
sijie closed pull request #2586: fixing/improving logging for function instance
URL: https://github.com/apache/incubator-pulsar/pull/2586
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 11da7c30ac..083686b503 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,27 +22,18 @@
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
-import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index a978a09cb0..c5159b0706 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -22,32 +22,32 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.io.InputStream;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 /**
  * A function container implemented using java thread.
  */
@@ -97,8 +97,14 @@
 // by the child process and manually added to classpath
 args.add(String.format("-D%s=%s", 
FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
 args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
-args.add("-Dpulsar.log.dir=" + logDirectory);
-args.add("-Dpulsar.log.file=" + 
instanceConfig.getFunctionDetails().getName());
+args.add("-Dpulsar.function.log.dir=" + String.format(
+"%s/%s",
+logDirectory,
+
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails(;
+args.add("-Dpulsar.function.log.file=" + String.format(
+"%s-%s",
+instanceConfig.getFunctionDetails().getName(),
+instanceConfig.getInstanceId()));
 if (instanceConfig.getFunctionDetails().getResources() != null) {

[GitHub] jerrypeng commented on issue #2586: fixing/improving logging for function instance

2018-09-19 Thread GitBox
jerrypeng commented on issue #2586: fixing/improving logging for function 
instance
URL: https://github.com/apache/incubator-pulsar/pull/2586#issuecomment-422669848
 
 
   rerun integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2606: Add support for dead letter topics for java functions

2018-09-19 Thread GitBox
srkukarni commented on issue #2606: Add support for dead letter topics for java 
functions
URL: https://github.com/apache/incubator-pulsar/pull/2606#issuecomment-422666303
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cckellogg opened a new pull request #2609: Initial commit for a go based pulsar cli.

2018-09-18 Thread GitBox
cckellogg opened a new pull request #2609: Initial commit for a go based pulsar 
cli.
URL: https://github.com/apache/incubator-pulsar/pull/2609
 
 
   ### Motivation
   
   Initial patch to create a pulsar go based admin cli. This will help improve 
the cli performance, make the user experience better with more detailed 
examples and help documentation and simply the command structure.
   
   ### Modifications
   
   New go based admin cli
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2608: Enforce boost-python was found by CMake

2018-09-18 Thread GitBox
merlimat opened a new pull request #2608: Enforce boost-python was found by 
CMake
URL: https://github.com/apache/incubator-pulsar/pull/2608
 
 
   ### Motivation
   
   Ensure the CMake preparation fails when the correct Boost-Python library is 
not found. With the current behavior, the build goes through even when not 
found, and then the resulting `_pulsar.so` will not be linked with boost-python 
and will fail to load at runtime.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2604: [tests] remove PersistentQueueE2ETest#testConsumersWithDifferentPermits

2018-09-18 Thread GitBox
sijie closed pull request #2604: [tests] remove 
PersistentQueueE2ETest#testConsumersWithDifferentPermits
URL: https://github.com/apache/incubator-pulsar/pull/2604
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia closed pull request #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 38a55bf47c..8f9eefea94 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ void runCmd() throws Exception {
 protected String DEPRECATED_sinkConfigString;
 @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
 protected String sinkConfigString;
+@Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
+protected boolean autoAck = true;
+@Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
+protected Long timeoutMs;
 
 protected SinkConfig sinkConfig;
 
@@ -399,6 +403,15 @@ void processArguments() throws Exception {
 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
 }
 
+sinkConfig.setAutoAck(autoAck);
+if (timeoutMs != null) {
+sinkConfig.setTimeoutMs(timeoutMs);
+}
+
+if (null != sinkConfigString) {
+sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+}
+
 inferMissingArguments(sinkConfig);
 }
 
@@ -585,7 +598,11 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
 : SubscriptionType.SHARED;
 sourceSpecBuilder.setSubscriptionType(subType);
 
-functionDetailsBuilder.setAutoAck(true);
+functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+if (sinkConfig.getTimeoutMs() != null) {
+sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+}
+
 functionDetailsBuilder.setSource(sourceSpecBuilder);
 
 // set up sink spec
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index e3ba70e025..fb13d39913 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -133,6 +133,7 @@ public SinkConfig getSinkConfig() {
 sinkConfig.setTenant(TENANT);
 sinkConfig.setNamespace(NAMESPACE);
 sinkConfig.setName(NAME);
+sinkConfig.setAutoAck(true);
 
 sinkConfig.setInputs(INPUTS_LIST);
 sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index be886c41ae..1132fa6b86 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -78,6 +78,9 @@
 private boolean retainOrdering;
 @isValidResources
 private Resources resources;
+private boolean autoAck;
+@isPositiveNumber
+private Long timeoutMs;
 
 @isFileExists
 private String archive;
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 5b1fb41d8b..42f6c9c181 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1014,6 +1014,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `update`
@@ -1091,6 +1093,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `available-sinks`


 


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] srkukarni commented on issue #2586: fixing/improving logging for function instance

2018-09-18 Thread GitBox
srkukarni commented on issue #2586: fixing/improving logging for function 
instance
URL: https://github.com/apache/incubator-pulsar/pull/2586#issuecomment-422638074
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218635418
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,153 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
 
 Review comment:
   I don't understand this logic. here it creates producer only if producer is 
not present under `publishProducers`. So, does it really need it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on issue #2605: implement topic routing on a per record 
basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-422597240
 
 
   @rdhabalia thanks for the review.  I have addressed your comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590573
 
 
   rerun java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia removed a comment on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia removed a comment on issue #2503: add auto ack and timeout 
configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590512
 
 
   retest java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2503: add auto ack and timeout configurable

2018-09-18 Thread GitBox
rdhabalia commented on issue #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-422590512
 
 
   retest java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2578: Add support for schema extraction from a jar

2018-09-18 Thread GitBox
aahmed-se commented on issue #2578: Add support for schema extraction from a jar
URL: https://github.com/apache/incubator-pulsar/pull/2578#issuecomment-422589407
 
 
   Added Test cases


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes

2018-09-18 Thread GitBox
srkukarni commented on issue #2577: fix behavior of JSONSchema for derived 
classes
URL: https://github.com/apache/incubator-pulsar/pull/2577#issuecomment-422578421
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia closed pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg

2018-09-18 Thread GitBox
rdhabalia closed pull request #2549: [Function] avoid creating assignment 
snapshot and publish individual assignment msg
URL: https://github.com/apache/incubator-pulsar/pull/2549
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb9f6..0c7b8af695 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,6 +44,8 @@ rescheduleTimeoutMs: 6
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 3
+# Frequency how often worker performs compaction on function-topics
+topicCompactionFrequencySec: 1800
 metricsSamplingPeriodSec: 60
 # Enforce authentication
 authenticationEnabled: false
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 425e04921b..612b33601b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -359,4 +359,4 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map
 this.latestForKey = latestForKey;
 }
 }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 32c93b4626..8e7b292d6a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1268,4 +1268,4 @@ public void testEmptyCompactionLedger() throws Exception {
 }
 }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
new file mode 100644
index 00..fe28a5119e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
+import 

[GitHub] aahmed-se opened a new pull request #2607: Add default value for test image org tag

2018-09-18 Thread GitBox
aahmed-se opened a new pull request #2607: Add default value for test image org 
tag
URL: https://github.com/apache/incubator-pulsar/pull/2607
 
 
   This adds a default value for the image tag in cases it's not speicifed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #2606: Add support for dead letter topics for java functions

2018-09-18 Thread GitBox
srkukarni opened a new pull request #2606: Add support for dead letter topics 
for java functions
URL: https://github.com/apache/incubator-pulsar/pull/2606
 
 
   ### Motivation
   
   This pr adds two options for running java functions
   1. Ability to control the number of times a message is retried by Pulsar
   2. Ability to redirect the messages that could not be processed even after 
retries to a dead letter topic
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2587: [tests] improve connector related integration tests

2018-09-18 Thread GitBox
sijie closed pull request #2587: [tests] improve connector related integration 
tests
URL: https://github.com/apache/incubator-pulsar/pull/2587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 08ff859e44..a509e19254 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
   connectors
   644
 
+
+
+  
${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar
+  connectors
+  644
+
   
 
diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7297..302bda80cc 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   org.apache.pulsar
   docker-images
   Apache Pulsar :: Docker Images
-  
-apachepulsar
-  
   
 pulsar
 grafana
diff --git a/pom.xml b/pom.xml
index 6d45a0093d..91ba14c55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.
 true
 false
 1
+apachepulsar
 
 
 
2.1.0-incubating
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0caf..fb7a76cdff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public void testReplayOnConsumerDisconnect() throws 
Exception {
 deleteTopic(topicName);
 }
 
-@Test
-public void testConsumersWithDifferentPermits() throws Exception {
-final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
-final String subName = "sub4";
-final int numMsgs = 1;
-
-final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
-final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
-final CountDownLatch latch = new CountDownLatch(numMsgs);
-
-int recvQ1 = 10;
-Consumer consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
-.messageListener((consumer, msg) -> {
-msgCountConsumer1.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-latch.countDown();
-} catch (PulsarClientException e) {
-fail("Should not fail");
-}
-}).subscribe();
-
-int recvQ2 = 1;
-Consumer consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
-.messageListener((consumer, msg) -> {
-msgCountConsumer2.incrementAndGet();
-try {
-consumer.acknowledge(msg);
-latch.countDown();
-} catch (PulsarClientException e) {
-fail("Should not fail");
-}
-}).subscribe();
-
-List> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-Producer producer = pulsarClient.newProducer().topic(topicName)
-.enableBatching(false)
-.maxPendingMessages(numMsgs + 1)
-.messageRoutingMode(MessageRoutingMode.SinglePartition)
-.create();
-for (int i = 0; i < numMsgs; i++) {
-String message = "msg-" + i;
-futures.add(producer.sendAsync(message.getBytes()));
-}
-FutureUtil.waitForAll(futures).get();
-producer.close();
-
-latch.await(5, TimeUnit.SECONDS);
-
-assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + 
recvQ2), numMsgs * 0.1);
-assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), 
numMsgs * 0.1);
-
-consumer1.close();
-consumer2.close();
-
-deleteTopic(topicName);
-}
-
 // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
 // expected numbers. keeping this disabled to not break the build, but 
nevertheless this gives good insight into
 // how the round robin distribution algorithm is behaving
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 

[GitHub] sijie closed pull request #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
sijie closed pull request #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7297..302bda80cc 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   org.apache.pulsar
   docker-images
   Apache Pulsar :: Docker Images
-  
-apachepulsar
-  
   
 pulsar
 grafana
diff --git a/pom.xml b/pom.xml
index 6d45a0093d..91ba14c55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.
 true
 false
 1
+apachepulsar
 
 
 
2.1.0-incubating


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606849
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606215
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218606215
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603818
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218603562
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218605442
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
rdhabalia commented on a change in pull request #2605: implement topic routing 
on a per record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r218604865
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 ##
 @@ -60,140 +62,166 @@
 private final String fqfn;
 
 private interface PulsarSinkProcessor {
-void initializeOutputProducer(String outputTopic, Schema schema, 
String fqfn) throws Exception;
 
 TypedMessageBuilder newMessage(Record record) throws Exception;
 
 void sendOutputMessage(TypedMessageBuilder msg, Record record) 
throws Exception;
 
-abstract void close() throws Exception;
+void close() throws Exception;
 }
 
-private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
+private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor {
+protected Map> publishProducers = new 
ConcurrentHashMap<>();
+protected Schema schema;
 
-@Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+protected PulsarSinkProcessorBase(Schema schema) {
+this.schema = schema;
 }
 
-@Override
-public TypedMessageBuilder newMessage(Record record) {
-return producer.newMessage();
+public  Producer createProducer(PulsarClient client, String 
topic, String producerName, Schema schema, String fqfn)
+throws PulsarClientException {
+ProducerBuilder builder = client.newProducer(schema)
+.blockIfQueueFull(true)
+.enableBatching(true)
+.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+.compressionType(CompressionType.LZ4)
+.hashingScheme(HashingScheme.Murmur3_32Hash) //
+.messageRoutingMode(MessageRoutingMode.CustomPartition)
+.messageRouter(FunctionResultRouter.of())
+.topic(topic);
+if (producerName != null) {
+builder.producerName(producerName);
+}
+
+return builder
+.property("application", "pulsarfunction")
+.property("fqfn", fqfn).create();
 }
 
-@Override
-public void sendOutputMessage(TypedMessageBuilder msg, Record 
record) throws Exception {
-msg.sendAsync();
+protected Producer getProducer(String destinationTopic) {
+return getProducer(destinationTopic, null, destinationTopic);
 }
 
-@Override
-public void close() throws Exception {
-if (null != producer) {
+protected Producer getProducer(String producerId, String 
producerName, String topicName) {
+
+Producer producer = publishProducers.get(producerId);
+
+if (producer == null) {
 try {
-producer.close();
+Producer newProducer = createProducer(
+client,
+topicName,
+producerName,
+schema,
+fqfn);
+
+Producer existingProducer = 
publishProducers.putIfAbsent(producerId, newProducer);
+
+if (existingProducer != null) {
+// The value in the map was not updated after the 
concurrent put
+newProducer.close();
+producer = existingProducer;
+} else {
+producer = newProducer;
+}
+
 } catch (PulsarClientException e) {
-log.warn("Fail to close producer for processor {}", 
pulsarSinkConfig.getTopic(), e);
+log.error("Failed to create Producer while doing user 
publish", e);
+throw new RuntimeException(e);
 }
 }
+return producer;
 }
-}
-
-private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-private Producer producer;
 
 @Override
-public void initializeOutputProducer(String outputTopic, Schema 
schema, String fqfn) throws Exception {
-this.producer = AbstractOneOuputTopicProducers.createProducer(
-client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+public void close() throws Exception {
+List> closeFutures = new 
ArrayList<>(publishProducers.size());
+for (Map.Entry> entry: 
publishProducers.entrySet()) {
+String 

[GitHub] jerrypeng opened a new pull request #2605: implement topic routing on a per record basis

2018-09-18 Thread GitBox
jerrypeng opened a new pull request #2605: implement topic routing on a per 
record basis
URL: https://github.com/apache/incubator-pulsar/pull/2605
 
 
   ### Motivation
   
   There are use cases that the destination topic for a message cannot be 
determined at source submission time.  This requires the ability for sources to 
to set which topic a record should be written to.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on issue #2597: [build] Fix docker organization parameter

2018-09-18 Thread GitBox
srkukarni commented on issue #2597: [build] Fix docker organization parameter
URL: https://github.com/apache/incubator-pulsar/pull/2597#issuecomment-422562254
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   8   9   10   >