[GitHub] zhaijack opened a new pull request #1359: Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead

2018-03-07 Thread GitBox
zhaijack opened a new pull request #1359: Delete PartitionedConsumerImpl, use 
TopicsConsumerImpl instead
URL: https://github.com/apache/incubator-pulsar/pull/1359
 
 
   ### Motivation
   
   Currently we have both TopicsConsumerImpl and PartitionedConsumerImpl. The 
behaviour and code of them are similar:  contains a list of ConsumerImpl and 
make them work together. This PR tries to delete PartitionedConsumerImpl and 
use TopicsConsumerImpl instead.
   
   ### Modifications
   
   Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead.
   
   ### Result
   
   No api changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on issue #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
jai1 commented on issue #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#issuecomment-371403998
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only ConsumerImpl and ProducerImpl have client-cnx

2018-03-07 Thread GitBox
zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only 
ConsumerImpl and ProducerImpl have client-cnx
URL: https://github.com/apache/incubator-pulsar/pull/1354#issuecomment-371340762
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only ConsumerImpl and ProducerImpl have client-cnx

2018-03-07 Thread GitBox
zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only 
ConsumerImpl and ProducerImpl have client-cnx
URL: https://github.com/apache/incubator-pulsar/pull/1354#issuecomment-371397512
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on a change in pull request #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
jai1 commented on a change in pull request #1322: Use private impl for 
MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#discussion_r173070365
 
 

 ##
 File path: pulsar-client-cpp/lib/MessageIdImpl.h
 ##
 @@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
 
 Review comment:
   "#pragma once" is compiler specific and potentially not portable. 
"#ifndef/#define/#endif" is more portable and will work for all preprocessors.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jai1 commented on issue #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
jai1 commented on issue #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#issuecomment-371388067
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yush1ga commented on issue #1352: Delete inactive subscriptions automatically

2018-03-07 Thread GitBox
yush1ga commented on issue #1352: Delete inactive subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-371378494
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yush1ga commented on issue #1352: Delete inactive subscriptions automatically

2018-03-07 Thread GitBox
yush1ga commented on issue #1352: Delete inactive subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-371378494
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only ConsumerImpl and ProducerImpl have client-cnx

2018-03-07 Thread GitBox
zhaijack commented on issue #1354: Issue 1118: refine handlerBase to let only 
ConsumerImpl and ProducerImpl have client-cnx
URL: https://github.com/apache/incubator-pulsar/pull/1354#issuecomment-371340762
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack opened a new pull request #1358: Issue 1071: add ratelimiter for subscription

2018-03-07 Thread GitBox
zhaijack opened a new pull request #1358: Issue 1071: add ratelimiter for 
subscription
URL: https://github.com/apache/incubator-pulsar/pull/1358
 
 
   ### Motivation
   
   Currently rate limiting and throttling are applied at namespace/topic level. 
It would be good to provide a mechanism to apply these settings in subscription 
level, so user can control different rate limiting, throttling strategies for 
different subscriptions.
   
   ### Modifications
   
   Provide a rate limiter setting for subscription, and all subscriptions under 
same namespaces will have same a rate limiter. This is similar to what we have 
for topics.
   
   ### Result
   
   current behaviour will not change. 
   user could set rate limiter for subscriptions.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #1357: Pulsar Functions documentation follow-up

2018-03-07 Thread GitBox
srkukarni commented on a change in pull request #1357: Pulsar Functions 
documentation follow-up
URL: https://github.com/apache/incubator-pulsar/pull/1357#discussion_r173002324
 
 

 ##
 File path: site/docs/latest/functions/quickstart.md
 ##
 @@ -1,29 +1,176 @@
 ---
 title: Getting started with Pulsar Functions
+lead: Write and run your first Pulsar Function in just a few steps
 ---
 
-## The `pulsar-functions` CLI tool
+This tutorial will walk you through running a {% popover standalone %} Pulsar 
{% popover cluster %} on your machine and then running your first Pulsar 
Functions using that cluster. The first function will run in local run mode 
(outside your Pulsar {% popover cluster %}), while the second will run in 
cluster mode (inside your cluster).
 
-[`pulsar-functions`](../../reference/CliTools#pulsar-functions)
+{% include admonition.html content="In local run mode, your Pulsar Function 
will communicate with your Pulsar cluster but will run outside of the cluster." 
%}
+
+## Prerequisites
+
+In order to follow along with this tutorial, you'll need to have 
[Maven](https://maven.apache.org/download.cgi) installed on your machine.
+
+## Run a standalone Pulsar cluster
+
+In order to run our Pulsar Functions, we'll need to run a Pulsar cluster 
locally first. The easiest way to do that is to run Pulsar in {% popover 
standalone %} mode. Follow these steps to start up a standalone cluster:
+
+```bash
+$ wget 
https://github.com/streamlio/incubator-pulsar/releases/download/2.0.0-incubating-functions-preview/apache-pulsar-2.0.0-incubating-functions-preview-bin.tar.gz
+$ tar xvf apache-pulsar-2.0.0-incubating-functions-preview-bin.tar.gz
+$ cd apache-pulsar-2.0.0-incubating-functions-preview
+$ bin/pulsar standalone \
+  --advertised-address 127.0.0.1
+```
+
+When running Pulsar in standalone mode, the `sample` {% popover tenant %} and 
`ns1` {% popover namespace %} will be created automatically for you. That 
tenant and namespace will be used throughout this tutorial.
+
+## Run a Pulsar Function in local run mode
+
+To run a function in local mode, i.e. outside our Pulsar cluster:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+  --jar examples/api-examples.jar \
+  --className org.apache.pulsar.functions.api.examples.ExclamationFunction \
+  --inputs persistent://sample/standalone/ns1/exclamation-input \
+  --output persistent://sample/standalone/ns1/exclamation-output \
+  --name exclamation
+```
+
+The JAR file containing the function (written in Java) is included with the 
binary distribution you downloaded above.
+
+We can use the [`pulsar-client`](../../reference/CliTools#pulsar-client) CLI 
tool to publish a message to the input topic:
+
+```bash
+$ bin/pulsar-client produce 
persistent://sample/standalone/ns1/exclamation-input \
+  --num-produce 1 \
+  --messages "Hello world"
+```
+
+Here's what happens next:
+
+* The `Hello world` message that we published to the input {% popover topic %} 
(`persistent://sample/standalone/ns1/exclamation-input`) will be passed to the 
exclamation function that we're now running on our laptop
+* The exclamation function will process the message (providing a result of 
`Hello world!`) and publish the result to the output topic 
(`persistent://sample/standalone/ns1/exclamation-output`).
+* Pulsar will durably store the message data published to the output topic in 
[Apache BookKeeper](https://bookkeeper.apache.org) until a {% popover consumer 
%} consumes and {% popover acknowledges %} the message
+
+To consume the message, we can use the same 
[`pulsar-client`](../../reference/CliTools#pulsar-client) tool that we used to 
publish the original message:
+
+```bash
+$ bin/pulsar-client consume 
persistent://sample/standalone/ns1/exclamation-output \
+  --subscription-name my-subscription \
+  --num-messages 1
+```
+
+In the output, you should see the following:
+
+```
+- got message -
+Hello world!
+```
+
+Success! As you can see, the message has been successfully processed by the 
exclamation function. To shut down the function, simply hit **Ctrl+C**.
+
+## Run a Pulsar Function in cluster mode
+
+[Local run mode](#run-a-pulsar-function-in-local-run-mode) is useful for 
development and experimentation, but if you wanted to use Pulsar Functions in a 
real Pulsar deployment, you'd want to run them in **cluster mode**. In cluster 
mode, Pulsar Functions run *inside* your Pulsar cluster and are managed using 
the same `pulsar-admin functions` interface that we've been using thus far.
 
 Review comment:
   wording can be changed here. Ppl can actually launch topologies in localrun 
to be run on clusters like mesos/k8. Running inside Pulsar Cluster is just yet 
another conv mode.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, plea

[GitHub] srkukarni commented on a change in pull request #1357: Pulsar Functions documentation follow-up

2018-03-07 Thread GitBox
srkukarni commented on a change in pull request #1357: Pulsar Functions 
documentation follow-up
URL: https://github.com/apache/incubator-pulsar/pull/1357#discussion_r172999405
 
 

 ##
 File path: site/docs/latest/functions/api.md
 ##
 @@ -2,9 +2,70 @@
 title: The Pulsar Functions API
 ---
 
+Pulsar Functions provides an easy-to-use API that develoeprs can use to 
+
+## Core programming model
+
+Pulsar Functions
+
+### Source and sink topics
+
+All Pulsar Functions have one or more **source topics** that supply messages 
to the function.
+
+### Sink topic
+
+At the moment, Pulsar Functions can have at most one **sink topic** to which 
processing results are published.
+
+### SerDe
+
+SerDe stands for **Ser**ialization and **De**serialization
+
+## Context
+
+Both the [Java](#java-functions-with-context) and 
[Python](#python-functions-with-context) APIs provide optional access to a 
**context object** that can be used by the function. This context object 
provides a wide variety of information to the function:
+
+* The name and ID of the Pulsar Function
+* The message ID of each message
+* The name of the topic on which the message was sent
+* The names of all [source topics](#source-topics) and the [sink 
topics](#sink-topic) associated with the function
+* The name of the class used for [SerDe](#serde)
+* The {% popover tenant %} and {% popover namespace %} associated with the 
function
+* The ID of the Pulsar Functions instance running the function
+* The version of the function
+* The logger object used by the function
+
 
 Review comment:
   Also access to state


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #1357: Pulsar Functions documentation follow-up

2018-03-07 Thread GitBox
srkukarni commented on a change in pull request #1357: Pulsar Functions 
documentation follow-up
URL: https://github.com/apache/incubator-pulsar/pull/1357#discussion_r172999261
 
 

 ##
 File path: site/docs/latest/functions/api.md
 ##
 @@ -2,9 +2,70 @@
 title: The Pulsar Functions API
 ---
 
+Pulsar Functions provides an easy-to-use API that develoeprs can use to 
+
+## Core programming model
+
+Pulsar Functions
+
+### Source and sink topics
+
+All Pulsar Functions have one or more **source topics** that supply messages 
to the function.
+
+### Sink topic
+
+At the moment, Pulsar Functions can have at most one **sink topic** to which 
processing results are published.
+
 
 Review comment:
   You can add that inside the function logic, it can specifically publish to 
as many topics as it wants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1319: Schema registry (2/N)

2018-03-07 Thread GitBox
mgodave commented on issue #1319: Schema registry (2/N)
URL: https://github.com/apache/incubator-pulsar/pull/1319#issuecomment-371300300
 
 
   Ping


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lucperkins opened a new pull request #1357: Pulsar Functions documentation follow-up

2018-03-07 Thread GitBox
lucperkins opened a new pull request #1357: Pulsar Functions documentation 
follow-up
URL: https://github.com/apache/incubator-pulsar/pull/1357
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on issue #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#issuecomment-371297087
 
 
   @jai1 Addressed comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on issue #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#issuecomment-371296027
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on issue #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#issuecomment-371296027
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #1356: Log Topic for Functions

2018-03-07 Thread GitBox
sijie closed pull request #1356: Log Topic for Functions
URL: https://github.com/apache/incubator-pulsar/pull/1356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 553f1c658..c8010e778 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -143,6 +143,8 @@ void processArguments() throws Exception {}
 protected String inputs;
 @Parameter(names = "--output", description = "Output Topic Name")
 protected String output;
+@Parameter(names = "--logTopic", description = "Log Topic")
+protected String logTopic;
 @Parameter(names = "--customSerdeInputs", description = "Map of input 
topic to serde classname")
 protected String customSerdeInputString;
 @Parameter(names = "--outputSerdeClassName", description = "Output 
SerDe")
@@ -184,6 +186,9 @@ void processArguments() throws Exception {
 if (null != output) {
 functionConfigBuilder.setOutput(output);
 }
+if (null != logTopic) {
+functionConfigBuilder.setLogTopic(logTopic);
+}
 if (null != tenant) {
 functionConfigBuilder.setTenant(tenant);
 }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 85668069d..95fcd7a91 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -16,8 +16,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
+# 
+  
 # -*- encoding: utf-8 -*-
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
@@ -38,10 +38,10 @@
 
 
 DESCRIPTOR = _descriptor.FileDescriptor(
-  name='Function.proto',
+  name='pulsar-functions/proto/src/main/proto/Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xdb\x05\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x0e\n\x06inputs\x18\x0e 
\x03(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 
\x01(\t\x12H\n\x14processingGuarantees\x18\t 
\x01(\x0e\x32*.proto.FunctionConfig.ProcessingGuarantees\x12\x39\n\nuserConfig\x18\n
 
\x03(\x0b\x32%.proto.FunctionConfig.UserConfigEntry\x12@\n\x10subscriptionType\x18\x0b
 \x01(\x0e\x32&.proto.FunctionConfig.SubscriptionType\x12.\n\x07runtime\x18\x0c 
\x01(\x0e\x32\x1d.proto.FunctionConfig.Runtime\x12\x0f\n\x07\x61utoAck\x18\r 
\x01(\x08\x1a\x38\n\x16\x43ustomSerdeInputsEntry\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12\r\n\x05value\x18\x02 
\x01(\t:\x02\x38\x01\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 
\x01(\t\x12\r\n\x05value\x18\x02 
\x01(\t:\x02\x38\x01\"9\n\x14ProcessingGuarantees\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x00\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x01\"-\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\r\n\tEXCLUSIVE\x10\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01
 \x01(\t\"\x9f\x01\n\x10\x46unctionMetaData\x12-\n\x0e\x66unctionConfig\x18\x01 
\x01(\x0b\x32\x15.proto.FunctionConfig\x12\x37\n\x0fpackageLocation\x18\x02 
\x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 
\x01(\x04\x12\x12\n\ncreateTime\x18\x04 
\x01(\x04\"_\n\x08Snapshot\x12\x35\n\x14\x66unctionMetaDataList\x18\x01 
\x03(\x0b\x32\x17.proto.FunctionMetaData\x12\x1c\n\x14lastAppliedMessageId\x18\x02
 \x01(\x0c\"Q\n\nAssignment\x12\x31\n\x10\x66unctionMetaData\x18\x01 
\x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x10\n\x08workerId\x18\x02 
\x01(\tB-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
+  
serialized_pb=_b('\n4pulsar-functions/proto/src/main/proto/Function.proto\x12\x05proto\"\x98\x06\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\

[incubator-pulsar] branch master updated: Log Topic for Functions (#1356)

2018-03-07 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b24ab9  Log Topic for Functions (#1356)
6b24ab9 is described below

commit 6b24ab9cbb7a4d471c05c7244b84243936cba4f6
Author: Sanjeev Kulkarni 
AuthorDate: Wed Mar 7 13:02:33 2018 -0800

Log Topic for Functions (#1356)
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   5 +
 .../instance/src/main/python/Function_pb2.py   | 153 +++--
 pulsar-functions/instance/src/main/python/log.py   |   4 +-
 .../proto/src/main/proto/Function.proto|   1 +
 pulsar-functions/python-examples/logfunction.py|  30 
 .../pulsar/functions/runtime/JavaInstanceMain.java |   6 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |   5 +
 .../functions/runtime/ProcessRuntimeTest.java  |   7 +-
 8 files changed, 164 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 553f1c6..c8010e7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -143,6 +143,8 @@ public class CmdFunctions extends CmdBase {
 protected String inputs;
 @Parameter(names = "--output", description = "Output Topic Name")
 protected String output;
+@Parameter(names = "--logTopic", description = "Log Topic")
+protected String logTopic;
 @Parameter(names = "--customSerdeInputs", description = "Map of input 
topic to serde classname")
 protected String customSerdeInputString;
 @Parameter(names = "--outputSerdeClassName", description = "Output 
SerDe")
@@ -184,6 +186,9 @@ public class CmdFunctions extends CmdBase {
 if (null != output) {
 functionConfigBuilder.setOutput(output);
 }
+if (null != logTopic) {
+functionConfigBuilder.setLogTopic(logTopic);
+}
 if (null != tenant) {
 functionConfigBuilder.setTenant(tenant);
 }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 8566806..95fcd7a 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -16,8 +16,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
+# 
+  
 # -*- encoding: utf-8 -*-
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
@@ -38,10 +38,10 @@ _sym_db = _symbol_database.Default()
 
 
 DESCRIPTOR = _descriptor.FileDescriptor(
-  name='Function.proto',
+  name='pulsar-functions/proto/src/main/proto/Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xdb\x05\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x0e\n\x06inputs\x18\x0e 
\x03(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 
\x01(\t\x12H\n\x14processingGuarantees\x18\t \ [...]
+  
serialized_pb=_b('\n4pulsar-functions/proto/src/main/proto/Function.proto\x12\x05proto\"\x98\x06\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 
\x01(\t\x12H [...]
 )
 
 
@@ -53,18 +53,22 @@ _FUNCTIONCONFIG_PROCESSINGGUARANTEES = 
_descriptor.EnumDescriptor(
   file=DESCRIPTOR,
   values=[
 _descriptor.EnumValueDescriptor(
-  name='ATMOST_ONCE', index=0, number=0,
+  name='ATLEAST_ONCE', index=0, number=0,
+  options=None,
+  type=None),
+_descriptor.EnumValueDescriptor(
+  name='ATMOST_ONCE', index=1, number=1,
   options=None,
   type=None),
 _descriptor.EnumValueDescriptor(
-  name='ATLEAST_ONCE', index=1, number=1,
+  name='EFFECTIVELY_ONCE', index=2, number=2,
   options=None,
   type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=620,
-  serialized_end=677,
+  serialized_start=697,
+  serialized_end=776,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_PROCESSINGGUARANTEES)
 
@@ -85,8

[GitHub] mgodave closed pull request #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
mgodave closed pull request #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1e7764083..9c65fc883 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -20,30 +20,29 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
-
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -66,9 +65,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
 public abstract class AdminResource extends PulsarWebResource {
 private static final Logger log = 
LoggerFactory.getLogger(AdminResource.class);
 private static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
@@ -102,6 +98,21 @@ protected void zkCreateOptimistic(String path, byte[] 
content) throws Exception
 ZkUtils.createFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 }
 
+protected CompletableFuture zkAsyncCreateOptimistic(String path, 
byte[] content) {
+CompletableFuture future = new CompletableFuture<>();
+ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+(rc, path1, ctx, name) -> {
+KeeperException.Code code = KeeperException.Code.get(rc);
+if (code != KeeperException.Code.OK) {
+KeeperException e = KeeperException.create(code);
+future.completeExceptionally(e);
+} else {
+future.complete(null);
+}
+}, null);
+return future;
+}
+
 /**
  * Get the domain of the topic (whether it's persistent or non-persistent)
  */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9386df5ae..5dc142595 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -329,7 +329,7 @@ protected void internalRevokePermissionsOnTopic(String 
role) {
 }
 }
 
-protected void internalCreatePartitionedTopic(int numPartitions, boolean 
authoritative) {
+protected CompletableFuture internalCreatePartitionedTopic(int 
numPartitions, boolean authoritative) {
 validateAdminAccessOnProperty(topicName.getProperty());
 if (numPartitions <= 1) {
 throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 1");
@@ -338,16 +338,18 @@ protected void internalCreatePartitionedTopic(int 
numPartitions, boolean authori
 String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
 topicName.getEncodedLocalName());
 by

[GitHub] srkukarni commented on issue #1356: Log Topic for Functions

2018-03-07 Thread GitBox
srkukarni commented on issue #1356: Log Topic for Functions
URL: https://github.com/apache/incubator-pulsar/pull/1356#issuecomment-371250939
 
 
   @sijie @merlimat 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni opened a new pull request #1356: Log Topic for Functions

2018-03-07 Thread GitBox
srkukarni opened a new pull request #1356: Log Topic for Functions
URL: https://github.com/apache/incubator-pulsar/pull/1356
 
 
   
   ### Motivation
   
   Added ability to specify Log Topic where all logging done in a function go. 
This is often times a very useful way to debug functions
   
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355#issuecomment-371242117
 
 
   Feel free to close it.
   
   On Mar 7, 2018 11:43, "Matteo Merli"  wrote:
   
   > The point is, sleeping a thread in the REST endpoint is a resource hog and
   > generally not a great idea.
   >
   > I'm not debating that :)
   >
   > I think I see what you are saying. Either way, scheduling a return after
   > the sync timeout would be a better option.
   >
   > Definitely. It's still hacky but at least we don't keep the thread hanging.
   >
   > ?
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > 
,
   > or mute the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
merlimat commented on issue #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355#issuecomment-371240890
 
 
   > The point is, sleeping a thread in the REST endpoint is a resource hog and 
generally not a great idea. 
   
   I'm not debating that :) 
   
   > I think I see what you are saying. Either way, scheduling a return after 
the sync timeout would be a better option.
   
   Definitely. It's still hacky but at least we don't keep the thread hanging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355#issuecomment-371238796
 
 
   I *think* I see what you are saying. Either way, scheduling a return after 
the sync timeout would be a better option.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
mgodave commented on issue #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355#issuecomment-371238039
 
 
   The point is, sleeping a thread in the REST endpoint is a resource hog and 
generally not a great idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on a change in pull request #1322: Use private impl for 
MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#discussion_r172939330
 
 

 ##
 File path: pulsar-client-cpp/include/pulsar/MessageId.h
 ##
 @@ -50,34 +47,44 @@ class MessageId {
 /**
  * Serialize the message id into a binary string for storing
  */
-virtual void serialize(std::string& result) const;
+void serialize(std::string& result) const;
 
 Review comment:
   We have `operator<<` though not `operator>>` and these serve a different 
purpose in any case.
   
   Serialize/deserialize are serializing the message id in protobuf format 
(same as `MessageId.serialize()` in Java) and they are used for the Reader so 
that you can store the message id somewhere else and get it later to restart 
the reader on a particular message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: CompactedTopic should seek to position of cursor, not next position (#1336)

2018-03-07 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new dad679a  CompactedTopic should seek to position of cursor, not next 
position (#1336)
dad679a is described below

commit dad679ae1392bef4be67c654bcb7d19de79be266
Author: Ivan Kelly 
AuthorDate: Wed Mar 7 19:25:26 2018 +0100

CompactedTopic should seek to position of cursor, not next position (#1336)

* CompactedTopic should seek to position of cursor, not next position

When finding the first ledger entry to read from while reading from a
compacted topic ledger, we should find the position represented by the
cursor read position, not the next position after as has been done
until now.

This bug was due to a misunderstanding of how the read position
works. It was assumed it worked the same as the mark-delete position,
which is not the case.

* Fix clash on message cleanup
---
 .../pulsar/compaction/CompactedTopicImpl.java  |  6 +--
 .../pulsar/compaction/CompactedTopicTest.java  | 27 ++--
 .../apache/pulsar/compaction/CompactionTest.java   | 48 ++
 3 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e70381e..b1378b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -133,11 +133,11 @@ public class CompactedTopicImpl implements CompactedTopic 
{
 
 CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
 () -> {
-if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+if (comparePositionAndMessageId(p, startEntry.join()) <= 
0) {
 promise.complete(start);
-} else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+} else if (comparePositionAndMessageId(p, 
middleEntry.join()) <= 0) {
 findStartPointLoop(p, start, midpoint, promise, cache);
-} else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+} else if (comparePositionAndMessageId(p, endEntry.join()) 
<= 0) {
 findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
 } else {
 promise.complete(NEWER_THAN_COMPACTED);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index eb549fa..69d9cfb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -52,7 +52,7 @@ import io.netty.buffer.Unpooled;
 import lombok.Cleanup;
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
-private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
+private final Random r = new Random(0);
 
 @BeforeMethod
 @Override
@@ -77,9 +77,8 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
  * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
  */
 private Triple>, 
List>>
-buildCompactedLedger(BookKeeper bk, int seed, int count)
+buildCompactedLedger(BookKeeper bk, int count)
 throws Exception {
-Random r = new Random(seed);
 LedgerHandle lh = bk.createLedger(1, 1,
   
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
   
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
@@ -112,10 +111,12 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
 .setEntryId(entryIds.addAndGet(delta + 1)).build();
 
 @Cleanup
-RawMessage m = new RawMessageImpl(id, emptyBuffer);
+RawMessage m = new RawMessageImpl(id, 
Unpooled.EMPTY_BUFFER);
 
 CompletableFuture f = new CompletableFuture<>();
-lh.asyncAddEntry(m.serialize(),
+ByteBuf buffer = m.serialize();
+
+lh.asyncAddEntry(buffer,
 (rc, ledger, eid, ctx) -> {
  if (rc != BKException.Code.OK) {
  
f.completeExceptionally(BKException.create(rc));
@@ -125,6 +126,7 @@ public class CompactedTop

[GitHub] merlimat closed pull request #1336: CompactedTopic should seek to position of cursor, not next position

2018-03-07 Thread GitBox
merlimat closed pull request #1336: CompactedTopic should seek to position of 
cursor, not next position
URL: https://github.com/apache/incubator-pulsar/pull/1336
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e70381edb..b1378b648 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -133,11 +133,11 @@ private static void findStartPointLoop(PositionImpl p, 
long start, long end,
 
 CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
 () -> {
-if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+if (comparePositionAndMessageId(p, startEntry.join()) <= 
0) {
 promise.complete(start);
-} else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+} else if (comparePositionAndMessageId(p, 
middleEntry.join()) <= 0) {
 findStartPointLoop(p, start, midpoint, promise, cache);
-} else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+} else if (comparePositionAndMessageId(p, endEntry.join()) 
<= 0) {
 findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
 } else {
 promise.complete(NEWER_THAN_COMPACTED);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index eb549fa44..69d9cfbc7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -52,7 +52,7 @@
 import lombok.Cleanup;
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
-private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
+private final Random r = new Random(0);
 
 @BeforeMethod
 @Override
@@ -77,9 +77,8 @@ public void cleanup() throws Exception {
  * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
  */
 private Triple>, 
List>>
-buildCompactedLedger(BookKeeper bk, int seed, int count)
+buildCompactedLedger(BookKeeper bk, int count)
 throws Exception {
-Random r = new Random(seed);
 LedgerHandle lh = bk.createLedger(1, 1,
   
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
   
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
@@ -112,10 +111,12 @@ public void cleanup() throws Exception {
 .setEntryId(entryIds.addAndGet(delta + 1)).build();
 
 @Cleanup
-RawMessage m = new RawMessageImpl(id, emptyBuffer);
+RawMessage m = new RawMessageImpl(id, 
Unpooled.EMPTY_BUFFER);
 
 CompletableFuture f = new CompletableFuture<>();
-lh.asyncAddEntry(m.serialize(),
+ByteBuf buffer = m.serialize();
+
+lh.asyncAddEntry(buffer,
 (rc, ledger, eid, ctx) -> {
  if (rc != BKException.Code.OK) {
  
f.completeExceptionally(BKException.create(rc));
@@ -125,6 +126,7 @@ public void cleanup() throws Exception {
  f.complete(null);
  }
 }, null);
+buffer.release();
 return f;
 }).toArray(CompletableFuture[]::new)).get();
 lh.close();
@@ -138,7 +140,7 @@ public void testEntryLookup() throws Exception {
 this.conf, null);
 
 Triple>, List>> compactedLedgerData
-= buildCompactedLedger(bk, 0, 500);
+= buildCompactedLedger(bk, 500);
 
 List> positions = 
compactedLedgerData.getMiddle();
 List> idsInGaps = 
compactedLedgerData.getRight();
@@ -170,19 +172,14 @@ public void testEntryLookup() throws Exception {
 
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
 
 // shuffle to make cache work hard
-Collections.shuffle(positions);
-Collections.shuffle(idsInGaps);
+Collecti

[GitHub] merlimat commented on a change in pull request #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on a change in pull request #1322: Use private impl for 
MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#discussion_r172937375
 
 

 ##
 File path: pulsar-client-cpp/lib/MessageId.cc
 ##
 @@ -30,43 +29,51 @@
 
 namespace pulsar {
 
-MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {}
+class MessageIdImpl {
 
 Review comment:
   Sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
merlimat commented on issue #1355: Remove as many Thread.sleep calls from REST 
endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355#issuecomment-371232050
 
 
   @mgodave This change doesn't solve the problem that the ugly 
`Thread.sleep()` was introduced as a workaround for.
   
   The problem is not the sync vs async ZK call (though sure, the async with 
response continuation is more efficient). 
   
   The real problem is that in ZK there is no read-your-write consistency by 
default. That is also aggravated by the fact the we have a cache for 
configuration/metadata that is updated with ZK watches.
   
   The particular instance of the problem typically manifest itself only in a 
test environment where there are multiple brokers and multiple ZK servers in 
the ensemble. 
   
   It goes like this: 
* Create a partitioned topics --> create a z-node with the partitions info
* Try to use the partitioned topic immediately after
   1. The request to check the partitioned topic metadata can go to a 
different broker
   2. That broker is connected to a different ZK server
   3. That ZK server is few steps behind the ZK quorum
   4. The partitioned topic metadata is not there in ZK
   
   The "good" solution would be to always do a ZK `sync()` before reading the 
partitioned topic metadata. That would ensure that we can actually read our 
previous write.
   
   The problem with that is that we have to do a ZK write (implied by `sync()`) 
and that we can never cache the info in broker.
   
   + @ivankelly if you have any other suggestion
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1322: Use private impl for MessageId in c++ client

2018-03-07 Thread GitBox
merlimat commented on a change in pull request #1322: Use private impl for 
MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322#discussion_r172932307
 
 

 ##
 File path: pulsar-client-cpp/lib/MessageId.cc
 ##
 @@ -30,43 +29,51 @@
 
 namespace pulsar {
 
-MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {}
+class MessageIdImpl {
+   public:
+MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), 
batchIndex_(-1) {}
 
 Review comment:
   But can we use c++11??? You guys tell me.. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mgodave opened a new pull request #1355: Remove as many Thread.sleep calls from REST endpoint

2018-03-07 Thread GitBox
mgodave opened a new pull request #1355: Remove as many Thread.sleep calls from 
REST endpoint
URL: https://github.com/apache/incubator-pulsar/pull/1355
 
 
   Thread.sleep is rarely the correct answer as it can lead to race conditions 
and incorrect results. In this case sleeping the tread will tie up an entire 
thread in the REST endpoint. The operations that are being slept are themselves 
async and can easily be represented as Futures (CompletableFutures). 
Furthermore the REST framework we are using supports async results. This has 
the benefit of still allowing timeouts on the endpoint, allowing an operation 
to fail or complete, and not tie up an entire serving thread while an already 
async operation completes. There is still one remaining Thread.sleep that I 
would like to squash but I wanted to put this work up for consideration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1145: Intermittent test failure in PersistentTopicTest.testClosingReplicationProducerTwice

2018-03-07 Thread GitBox
ivankelly commented on issue #1145: Intermittent test failure in 
PersistentTopicTest.testClosingReplicationProducerTwice
URL: 
https://github.com/apache/incubator-pulsar/issues/1145#issuecomment-371220837
 
 
   Found the race. The builder stuff messed up this test completely, but the 
race is still there. Just put a 1s sleep after creating the replicator. Problem 
is ultimately that the mock should be applied before creating the replicator, 
because the replicator calls startProducer on construction, which will fail, 
and the call it again after a backoff, which is where the extra invokations are.
   
   I'll submit a patch tomorrow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1145: Intermittent test failure in PersistentTopicTest.testClosingReplicationProducerTwice

2018-03-07 Thread GitBox
ivankelly commented on issue #1145: Intermittent test failure in 
PersistentTopicTest.testClosingReplicationProducerTwice
URL: 
https://github.com/apache/incubator-pulsar/issues/1145#issuecomment-371209316
 
 
   There any logs beside that snippet? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on issue #1266: Pass all Apache Podling Website Checks

2018-03-07 Thread GitBox
maskit commented on issue #1266: Pass all Apache Podling Website Checks
URL: 
https://github.com/apache/incubator-pulsar/issues/1266#issuecomment-371205912
 
 
   Generated a 212px width PNG logo image from the SVG in site/img/ that 
satisfies the requirement[1].
   
   
![pulsar](https://user-images.githubusercontent.com/153144/37105921-c014356c-2273-11e8-9a79-108372169b58.png)
   
   
   [1] 
https://svn.apache.org/repos/asf/infrastructure/site/trunk/content/img/HEADER.html
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack opened a new pull request #1354: Issue 1118: refine handlerBase to let only ConsumerImpl and ProducerImpl have client-cnx

2018-03-07 Thread GitBox
zhaijack opened a new pull request #1354: Issue 1118: refine handlerBase to let 
only ConsumerImpl and ProducerImpl have client-cnx
URL: https://github.com/apache/incubator-pulsar/pull/1354
 
 
   ### Motivation
   
class HandlerBase contains 2 parts:
   - handler state;  this is need for all handler.
   - client_cnx connection handling; this part only useful in ConsumerImpl and 
ProducerImpl, while not need for PartitionedConsumer, PartitionedProducer, 
TopicsConsumer.
   
   This change would like to refine HandlerBase into 2 parts, to make 
client_cnx connection handling only served in ConsumerImpl and ProducerImpl
   
   ### Modifications
   
   Split HandlerBase into 2 class: 
   - HandlerState: handling the state of every consumer/producer;
   - ConnectionHandler: handling the client_cnx connection for ConsumerImpl and 
ProducerImpl.
   
   ### Result
   no api changes. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ivankelly commented on issue #1145: Intermittent test failure in PersistentTopicTest.testClosingReplicationProducerTwice

2018-03-07 Thread GitBox
ivankelly commented on issue #1145: Intermittent test failure in 
PersistentTopicTest.testClosingReplicationProducerTwice
URL: 
https://github.com/apache/incubator-pulsar/issues/1145#issuecomment-371188779
 
 
   This is a dupe of #617 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maskit commented on issue #1258: Fix CID 262329

2018-03-07 Thread GitBox
maskit commented on issue #1258: Fix CID 262329
URL: https://github.com/apache/incubator-pulsar/pull/1258#issuecomment-371185764
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Have the ability to send log messages to a topic in Python (#1353)

2018-03-07 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a8a595d  Have the ability to send log messages to a topic in Python 
(#1353)
a8a595d is described below

commit a8a595d9124d274f030cabff230e3bddc76cc241
Author: Sanjeev Kulkarni 
AuthorDate: Wed Mar 7 07:29:23 2018 -0800

Have the ability to send log messages to a topic in Python (#1353)

* Have the ability to send log messages to a topic in Python

* Address review comments
---
 pulsar-functions/instance/src/main/python/log.py   | 26 --
 .../instance/src/main/python/python_instance.py| 16 -
 .../src/main/python/python_instance_main.py|  4 +++-
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/log.py 
b/pulsar-functions/instance/src/main/python/log.py
index f36e684..85b2104 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -23,6 +23,7 @@
 ''' log.py '''
 import logging
 from logging.handlers import RotatingFileHandler
+import pulsar
 
 # Create the logger
 # pylint: disable=invalid-name
@@ -34,6 +35,19 @@ Log = logging.getLogger()
 # see time formatter documentation for more
 date_format = "%Y-%m-%d %H:%M:%S %z"
 
+class LogTopicHandler(logging.Handler):
+  def __init__(self, topic_name, pulsar_client):
+Log.info("Setting up producer for log topic %s" % topic_name)
+self.producer = pulsar_client.create_producer(
+  str(topic_name),
+  block_if_queue_full=True,
+  batching_enabled=True,
+  batching_max_publish_delay_ms=100,
+  compression_type=pulsar._pulsar.CompressionType.LZ4)
+
+  def emit(self, record):
+self.producer.send_async(record)
+
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
 
@@ -50,14 +64,22 @@ def configure(level=logging.INFO):
   Log.handlers.remove(handler)
 
   Log.setLevel(level)
+  stream_handler = logging.StreamHandler()
+  add_handler(stream_handler)
 
+def remove_all_handlers():
+  retval = None
+  for handler in Log.handlers:
+Log.handlers.remove(handler)
+retval = handler
+  return retval
+
+def add_handler(stream_handler):
   log_format = "[%(asctime)s] [%(levelname)s]: %(message)s"
   formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
-  stream_handler = logging.StreamHandler()
   stream_handler.setFormatter(formatter)
   Log.addHandler(stream_handler)
 
-
 def init_rotating_logger(level, logfile, max_files, max_bytes):
   """Initializes a rotating logger
 
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 08fcef5..077cb72 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,10 +94,13 @@ class Stats(object):
   return self.latency / self.nsuccessfullyprocessed
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_config, max_buffered_tuples, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, 
function_config, max_buffered_tuples, user_code, log_topic, pulsar_client):
 self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_config, max_buffered_tuples)
 self.user_code = user_code
 self.queue = Queue.Queue(max_buffered_tuples)
+self.log_topic_handler = None
+if log_topic is not None:
+  self.log_topic_handler = log.LogTopicHandler(str(log_topic), 
pulsar_client)
 self.pulsar_client = pulsar_client
 self.input_serdes = {}
 self.consumers = {}
@@ -174,7 +177,11 @@ class PythonInstance(object):
 continue
   self.contextimpl.set_current_message_context(msg.message.message_id(), 
msg.topic)
   output_object = None
+  self.saved_log_handler = None
   try:
+if self.log_topic_handler is not None:
+  self.saved_log_handler = log.remove_all_handlers()
+  log.add_handler(self.log_topic_handler)
 start_time = time.time()
 self.current_stats.increment_processed(int(start_time) * 1000)
 self.total_stats.increment_processed(int(start_time) * 1000)
@@ -188,9 +195,16 @@ class PythonInstance(object):
 self.current_stats.increment_successfully_processed(latency)
 self.process_result(output_object, msg)
   except Exception as e:
+if self.log_topic_handler is not None:
+  log.remove_all_handlers()
+  log.add_handler(self.saved_log_handler)
 Log.exception("Exception while executing user method")
 self.total_stats.record_user_exception(e)
 self.current_stats.record_use

[GitHub] merlimat closed pull request #1353: Have the ability to send log messages to a topic in Python

2018-03-07 Thread GitBox
merlimat closed pull request #1353: Have the ability to send log messages to a 
topic in Python
URL: https://github.com/apache/incubator-pulsar/pull/1353
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/python/log.py 
b/pulsar-functions/instance/src/main/python/log.py
index f36e684f2..85b2104e2 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -23,6 +23,7 @@
 ''' log.py '''
 import logging
 from logging.handlers import RotatingFileHandler
+import pulsar
 
 # Create the logger
 # pylint: disable=invalid-name
@@ -34,6 +35,19 @@
 # see time formatter documentation for more
 date_format = "%Y-%m-%d %H:%M:%S %z"
 
+class LogTopicHandler(logging.Handler):
+  def __init__(self, topic_name, pulsar_client):
+Log.info("Setting up producer for log topic %s" % topic_name)
+self.producer = pulsar_client.create_producer(
+  str(topic_name),
+  block_if_queue_full=True,
+  batching_enabled=True,
+  batching_max_publish_delay_ms=100,
+  compression_type=pulsar._pulsar.CompressionType.LZ4)
+
+  def emit(self, record):
+self.producer.send_async(record)
+
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
 
@@ -50,14 +64,22 @@ def configure(level=logging.INFO):
   Log.handlers.remove(handler)
 
   Log.setLevel(level)
+  stream_handler = logging.StreamHandler()
+  add_handler(stream_handler)
 
+def remove_all_handlers():
+  retval = None
+  for handler in Log.handlers:
+Log.handlers.remove(handler)
+retval = handler
+  return retval
+
+def add_handler(stream_handler):
   log_format = "[%(asctime)s] [%(levelname)s]: %(message)s"
   formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
-  stream_handler = logging.StreamHandler()
   stream_handler.setFormatter(formatter)
   Log.addHandler(stream_handler)
 
-
 def init_rotating_logger(level, logfile, max_files, max_bytes):
   """Initializes a rotating logger
 
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 08fcef527..077cb723d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,10 +94,13 @@ def compute_latency(self):
   return self.latency / self.nsuccessfullyprocessed
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_config, max_buffered_tuples, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, 
function_config, max_buffered_tuples, user_code, log_topic, pulsar_client):
 self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_config, max_buffered_tuples)
 self.user_code = user_code
 self.queue = Queue.Queue(max_buffered_tuples)
+self.log_topic_handler = None
+if log_topic is not None:
+  self.log_topic_handler = log.LogTopicHandler(str(log_topic), 
pulsar_client)
 self.pulsar_client = pulsar_client
 self.input_serdes = {}
 self.consumers = {}
@@ -174,7 +177,11 @@ def actual_execution(self):
 continue
   self.contextimpl.set_current_message_context(msg.message.message_id(), 
msg.topic)
   output_object = None
+  self.saved_log_handler = None
   try:
+if self.log_topic_handler is not None:
+  self.saved_log_handler = log.remove_all_handlers()
+  log.add_handler(self.log_topic_handler)
 start_time = time.time()
 self.current_stats.increment_processed(int(start_time) * 1000)
 self.total_stats.increment_processed(int(start_time) * 1000)
@@ -188,9 +195,16 @@ def actual_execution(self):
 self.current_stats.increment_successfully_processed(latency)
 self.process_result(output_object, msg)
   except Exception as e:
+if self.log_topic_handler is not None:
+  log.remove_all_handlers()
+  log.add_handler(self.saved_log_handler)
 Log.exception("Exception while executing user method")
 self.total_stats.record_user_exception(e)
 self.current_stats.record_user_exception(e)
+  finally:
+if self.log_topic_handler is not None:
+  log.remove_all_handlers()
+  log.add_handler(self.saved_log_handler)
 
   def done_producing(self, consumer, orig_message, result, sent_message):
 if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 3fd43580a..4fe164196 100644
--- a/pulsa

[GitHub] XiaoZYang commented on issue #1219: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages

2018-03-07 Thread GitBox
XiaoZYang commented on issue #1219: Issue 1069: Provide a setting in consumer 
configuration to specify where to start consuming messages 
URL: https://github.com/apache/incubator-pulsar/pull/1219#issuecomment-371149305
 
 
   ping @merlimat 
   Thanks for making a review on this commit.
   Since the earliest position should be the first message written to a topic, 
there is no message is before it, and the counter here means the 
`ConsumedCount` which equals `all messages - messages in backlog`, so the 
counter can be set to 0 directly.  Do I misunderstand this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XiaoZYang commented on issue #1219: Issue 1069: Provide a setting in consumer configuration to specify where to start consuming messages

2018-03-07 Thread GitBox
XiaoZYang commented on issue #1219: Issue 1069: Provide a setting in consumer 
configuration to specify where to start consuming messages 
URL: https://github.com/apache/incubator-pulsar/pull/1219#issuecomment-371149305
 
 
   ping @merlimat 
   Since the earliest position should be the first message written to a topic, 
there is no message is before it, and the counter here means the 
`ConsumedCount` which equals `all messages - messages in backlog`, so the 
counter can be set to 0 directly.  Do I misunderstand this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack closed issue #1237: Make subscribe for TopicsConsumer using builder

2018-03-07 Thread GitBox
zhaijack closed issue #1237: Make subscribe for TopicsConsumer using builder
URL: https://github.com/apache/incubator-pulsar/issues/1237
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yush1ga commented on a change in pull request #1352: Delete inactive subscriptions automatically

2018-03-07 Thread GitBox
yush1ga commented on a change in pull request #1352: Delete inactive 
subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352#discussion_r172765485
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 ##
 @@ -812,6 +820,15 @@ public void checkMessageDeduplicationInfo() {
 });
 }
 
+public void checkInactiveSubscriptions() {
+topics.forEach((n, t) -> {
+Topic topic = t.getNow(null);
 
 Review comment:
   I see, thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yush1ga commented on issue #1352: Delete inactive subscriptions automatically

2018-03-07 Thread GitBox
yush1ga commented on issue #1352: Delete inactive subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-371057712
 
 
   @merlimat 
   > Another question is: how can this be enforced across brokers restarts?
   
   Did you mean that a scheduler for inactive subscription checker is reset 
when a broker restarts ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services