[GitHub] aahmed-se commented on issue #2456: Remove Aerospike Connector

2018-08-27 Thread GitBox
aahmed-se commented on issue #2456: Remove Aerospike Connector
URL: https://github.com/apache/incubator-pulsar/pull/2456#issuecomment-416458802
 
 
   Rebased on master


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se commented on issue #2434: Add pulsar flink sink connector

2018-08-27 Thread GitBox
aahmed-se commented on issue #2434: Add pulsar flink sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416458754
 
 
   should be fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui commented on a change in pull request #2460: Add backlog quota retention policy to server config.

2018-08-27 Thread GitBox
codelipenghui commented on a change in pull request #2460: Add backlog quota 
retention policy to server config.
URL: https://github.com/apache/incubator-pulsar/pull/2460#discussion_r213189938
 
 

 ##
 File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 ##
 @@ -86,6 +86,11 @@
 private int backlogQuotaCheckIntervalInSeconds = 60;
 // Default per-topic backlog quota limit
 private long backlogQuotaDefaultLimitGB = 50;
+//Default backlog quota retention policy. Default is producer_request_hold
+//'producer_request_hold' Policy which holds producer's send request until 
the resource becomes available (or holding times out)
+//'producer_exception' Policy which throws 
javax.jms.ResourceAllocationException to the producer
+//'consumer_backlog_eviction' Policy which evicts the oldest message from 
the slowest consumer's backlog
+private String backlogQuotaRetentionPolicy = "producer_request_hold";
 
 Review comment:
   Yeah, it's already have a enum in BacklogQuota named RetentionPolicy
   ```java
   public static enum RetentionPolicy {
   /** Policy which holds producer's send request until the resource 
becomes available (or holding times out) */
   producer_request_hold,
   
   /** Policy which throws javax.jms.ResourceAllocationException to the 
producer */
   producer_exception,
   
   /** Policy which evicts the oldest message from the slowest 
consumer's backlog */
   consumer_backlog_eviction,
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2456: Remove Aerospike Connector

2018-08-27 Thread GitBox
sijie commented on issue #2456: Remove Aerospike Connector
URL: https://github.com/apache/incubator-pulsar/pull/2456#issuecomment-416456609
 
 
   @aahmed-se you need to rebase your branch


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2380: Increase default maxConcurrentLookupRequests to 50k

2018-08-27 Thread GitBox
sijie commented on issue #2380: Increase default maxConcurrentLookupRequests to 
50k
URL: https://github.com/apache/incubator-pulsar/pull/2380#issuecomment-416456387
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2376: adding tiered storage confs to endpoint

2018-08-27 Thread GitBox
sijie closed pull request #2376: adding tiered storage confs to endpoint
URL: https://github.com/apache/incubator-pulsar/pull/2376
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f6319aeaa7..f5240c989d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -62,7 +62,7 @@
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
 import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
@@ -660,14 +660,13 @@ public LedgerOffloader getManagedLedgerOffloader() {
 static final String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
 static final String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
 
-
 public synchronized LedgerOffloader 
createManagedLedgerOffloader(ServiceConfiguration conf)
 throws PulsarServerException {
 if (conf.getManagedLedgerOffloadDriver() != null
 && 
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
 {
 try {
 return BlobStoreManagedLedgerOffloader.create(
-getTieredStorageConf(conf),
+getTieredStorageConf(),
 ImmutableMap.of(
 METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
 METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getGitSha()
@@ -681,23 +680,23 @@ public synchronized LedgerOffloader 
createManagedLedgerOffloader(ServiceConfigur
 }
 }
 
-private static TieredStorageConfigurationData 
getTieredStorageConf(ServiceConfiguration serverConf) {
+public TieredStorageConfigurationData getTieredStorageConf() {
 TieredStorageConfigurationData tsConf = new 
TieredStorageConfigurationData();
 // generic settings
-
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
-
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
+
tsConf.setManagedLedgerOffloadDriver(config.getManagedLedgerOffloadDriver());
+
tsConf.setManagedLedgerOffloadMaxThreads(config.getManagedLedgerOffloadMaxThreads());
 // s3 settings
-
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
-
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
-
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
-
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
+
tsConf.setS3ManagedLedgerOffloadRegion(config.getS3ManagedLedgerOffloadRegion());
+
tsConf.setS3ManagedLedgerOffloadBucket(config.getS3ManagedLedgerOffloadBucket());
+
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(config.getS3ManagedLedgerOffloadServiceEndpoint());
+
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(config.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(config.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
 // gcs settings
-
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
-
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
-
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
-
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
+
tsConf.setGcsManagedLedgerOffloadRegion(config.getGcsManagedLedgerOffloadRegion());
+

[GitHub] sijie commented on issue #2376: adding tiered storage confs to endpoint

2018-08-27 Thread GitBox
sijie commented on issue #2376: adding tiered storage confs to endpoint
URL: https://github.com/apache/incubator-pulsar/pull/2376#issuecomment-416456296
 
 
   This is not needed any more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Fix flake in DiscoveryServiceTest (#1081) (#2406)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 774cafa  Fix flake in DiscoveryServiceTest (#1081) (#2406)
774cafa is described below

commit 774cafa25d6340c778c5c281045446086aff96a0
Author: Ivan Kelly 
AuthorDate: Tue Aug 28 07:30:42 2018 +0200

Fix flake in DiscoveryServiceTest (#1081) (#2406)

This test was flaking because it was only waiting for 1 second for
connection and message exchange to complete, which is not enough time
when there's heavy load on the machine (simulated with stress-ng).

The fix is to increase the timeout to 10 seconds. I've also cleaned up
the test to use a CompletableFuture rather than a CountDownLatch so
tha the test thread can be notified of failures in the handlers.
---
 .../discovery/service/DiscoveryServiceTest.java| 77 --
 1 file changed, 43 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 7d4a5ee..24bcd6a 100644
--- 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.discovery.service;
 import static 
org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
@@ -31,16 +30,17 @@ import java.net.URISyntaxException;
 import java.security.PrivateKey;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
@@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
 
+private static final Logger log = 
LoggerFactory.getLogger(DiscoveryServiceTest.class);
+
 private final static String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/certificate/client.crt";
 private final static String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/certificate/client.key";
 
@@ -120,48 +125,40 @@ public class DiscoveryServiceTest extends 
BaseDiscoveryTestSetup {
 
 /**
  * It verifies: client connects to Discovery-service and receives 
discovery response successfully.
- * 
+ *
  * @throws Exception
  */
 @Test
 public void testClientServerConnection() throws Exception {
 addBrokerToZk(2);
-// 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-final int messageTransfer = 2;
-final CountDownLatch latch = new CountDownLatch(messageTransfer);
-NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), latch, false);
-try {
-assertTrue(latch.await(1, TimeUnit.SECONDS));
-} catch (InterruptedException e) {
-fail("should have received lookup response message from server", 
e);
-}
+
+final CompletableFuture promise = new 
CompletableFuture<>();
+NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), promise, false);
+assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), 
BaseCommand.Type.CONNECTED);
 workerGroup.shutdownGracefully();
 }
 
 @Test
 public void testClientServerConnectionTls() throws Exception {
 addBrokerToZk(2);
-// 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-final int messageTransfer = 2;
-final 

[GitHub] sijie commented on issue #2434: Add pulsar flink sink connector

2018-08-27 Thread GitBox
sijie commented on issue #2434: Add pulsar flink sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416456095
 
 
   @aahmed-se there are some license header issues. PTAL
   
   ```
   2018-08-27\T\19:50:00.378 [ERROR] Failed to execute goal 
com.mycila:license-maven-plugin:3.0.rc1:check (default-cli) on project 
pulsar-flink: Some files do not have the expected license header -> [Help 1]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2406: Fix flake in DiscoveryServiceTest (#1081)

2018-08-27 Thread GitBox
sijie closed pull request #2406: Fix flake in DiscoveryServiceTest (#1081)
URL: https://github.com/apache/incubator-pulsar/pull/2406
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 7d4a5eeefc..24bcd6a302 100644
--- 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -21,7 +21,6 @@
 import static 
org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
@@ -31,16 +30,17 @@
 import java.security.PrivateKey;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
@@ -66,8 +66,13 @@
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
 
+private static final Logger log = 
LoggerFactory.getLogger(DiscoveryServiceTest.class);
+
 private final static String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/certificate/client.crt";
 private final static String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/certificate/client.key";
 
@@ -120,48 +125,40 @@ public void testGetPartitionsMetadata() throws Exception {
 
 /**
  * It verifies: client connects to Discovery-service and receives 
discovery response successfully.
- * 
+ *
  * @throws Exception
  */
 @Test
 public void testClientServerConnection() throws Exception {
 addBrokerToZk(2);
-// 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-final int messageTransfer = 2;
-final CountDownLatch latch = new CountDownLatch(messageTransfer);
-NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), latch, false);
-try {
-assertTrue(latch.await(1, TimeUnit.SECONDS));
-} catch (InterruptedException e) {
-fail("should have received lookup response message from server", 
e);
-}
+
+final CompletableFuture promise = new 
CompletableFuture<>();
+NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), promise, false);
+assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), 
BaseCommand.Type.CONNECTED);
 workerGroup.shutdownGracefully();
 }
 
 @Test
 public void testClientServerConnectionTls() throws Exception {
 addBrokerToZk(2);
-// 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-final int messageTransfer = 2;
-final CountDownLatch latch = new CountDownLatch(messageTransfer);
-NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrlTls(), latch, true);
-try {
-assertTrue(latch.await(1, TimeUnit.SECONDS));
-} catch (InterruptedException e) {
-fail("should have received lookup response message from server", 
e);
-}
+
+final CompletableFuture promise = new 
CompletableFuture<>();
+NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrlTls(), promise, true);
+assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), 
BaseCommand.Type.CONNECTED);
 workerGroup.shutdownGracefully();
 }
 
 /**
  * creates ClientHandler channel to connect and communicate with server
- * 
+ *
  * 

[GitHub] sijie commented on issue #2454: Fix compilation issue on CLITest

2018-08-27 Thread GitBox
sijie commented on issue #2454: Fix compilation issue on CLITest
URL: https://github.com/apache/incubator-pulsar/pull/2454#issuecomment-416455575
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie closed pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages

2018-08-27 Thread GitBox
sijie closed pull request #2455: Fixed {{pulsar.version}} into 
{{pulsar:version}} in few pages
URL: https://github.com/apache/incubator-pulsar/pull/2455
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site2/docs/client-libraries-go.md 
b/site2/docs/client-libraries-go.md
index 2433eb941f..421d45797b 100644
--- a/site2/docs/client-libraries-go.md
+++ b/site2/docs/client-libraries-go.md
@@ -34,7 +34,7 @@ $ go get -u 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar
 Or you can use [dep](https://github.com/golang/dep) for managing the 
dependencies.
 
 ```bash
-$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}}
+$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}}
 ```
 
 Once installed locally, you can import it into your project:
diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md
index 78ea557b6f..b5793c6e4a 100644
--- a/site2/docs/deploy-bare-metal.md
+++ b/site2/docs/deploy-bare-metal.md
@@ -125,12 +125,12 @@ $ tar xvfz 
apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git a/site2/docs/getting-started-standalone.md 
b/site2/docs/getting-started-standalone.md
index 40904512d0..0dcf024a40 100644
--- a/site2/docs/getting-started-standalone.md
+++ b/site2/docs/getting-started-standalone.md
@@ -87,12 +87,12 @@ $ tar xvfz 
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 042ce8e48b..914429b30b 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -69,12 +69,12 @@ $ tar xvfz 
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md 
b/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md
index 04fdae5302..206f3e2a28 100644
--- 
a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md
+++ 
b/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md
@@ -35,7 +35,7 @@ $ go get -u 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar
 Or you can use [dep](https://github.com/golang/dep) for managing the 
dependencies.
 
 ```bash
-$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}}
+$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}}
 ```
 
 Once installed locally, you can import it into your project:
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md 
b/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md
index 23b2121be8..feeb005cdc 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md
+++ 

[incubator-pulsar] branch master updated: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages (#2455)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8da0669  Fixed {{pulsar.version}} into {{pulsar:version}} in few pages 
(#2455)
8da0669 is described below

commit 8da066947b339d1f71025f8d3857bb7e5b9e70a2
Author: Matteo Merli 
AuthorDate: Mon Aug 27 22:26:10 2018 -0700

Fixed {{pulsar.version}} into {{pulsar:version}} in few pages (#2455)

### Motivation

In few places in website we're displaying the literal `{{pulsar.version}}` 
instead of the actual Pulsar version string. The correct variable name is 
expected to use `:` as separator.
---
 site2/docs/client-libraries-go.md|  2 +-
 site2/docs/deploy-bare-metal.md  | 12 ++--
 site2/docs/getting-started-standalone.md | 12 ++--
 site2/docs/io-quickstart.md  | 12 ++--
 .../version-2.1.0-incubating/client-libraries-go.md  |  2 +-
 .../version-2.1.0-incubating/deploy-bare-metal.md| 12 ++--
 .../version-2.1.0-incubating/getting-started-standalone.md   | 12 ++--
 .../versioned_docs/version-2.1.0-incubating/io-quickstart.md | 12 ++--
 8 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/site2/docs/client-libraries-go.md 
b/site2/docs/client-libraries-go.md
index 2433eb9..421d457 100644
--- a/site2/docs/client-libraries-go.md
+++ b/site2/docs/client-libraries-go.md
@@ -34,7 +34,7 @@ $ go get -u 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar
 Or you can use [dep](https://github.com/golang/dep) for managing the 
dependencies.
 
 ```bash
-$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}}
+$ dep ensure -add 
github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}}
 ```
 
 Once installed locally, you can import it into your project:
diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md
index 78ea557..b5793c6 100644
--- a/site2/docs/deploy-bare-metal.md
+++ b/site2/docs/deploy-bare-metal.md
@@ -125,12 +125,12 @@ $ tar xvfz 
apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git a/site2/docs/getting-started-standalone.md 
b/site2/docs/getting-started-standalone.md
index 4090451..0dcf024 100644
--- a/site2/docs/getting-started-standalone.md
+++ b/site2/docs/getting-started-standalone.md
@@ -87,12 +87,12 @@ $ tar xvfz 
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 042ce8e..914429b 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -69,12 +69,12 @@ $ tar xvfz 
/path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
-pulsar-io-aerospike-{{pulsar.version}}.nar
-pulsar-io-cassandra-{{pulsar.version}}.nar 
-pulsar-io-kafka-{{pulsar.version}}.nar 
-pulsar-io-kinesis-{{pulsar.version}}.nar   
-pulsar-io-rabbitmq-{{pulsar.version}}.nar  
-pulsar-io-twitter-{{pulsar.version}}.nar
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
 ...
 ```
 
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md 

[GitHub] sijie commented on a change in pull request #2459: Added Contributing page on website

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2459: Added Contributing page on 
website
URL: https://github.com/apache/incubator-pulsar/pull/2459#discussion_r213187047
 
 

 ##
 File path: site2/website/contributing.md
 ##
 @@ -0,0 +1,250 @@
+
+The Apache Pulsar community welcomes contributions from anyone with a passion 
for distributed systems! Pulsar has many different opportunities for 
contributions --
+write new examples/tutorials, add new user-facing libraries, write new Pulsar 
IO connectors, or participate on the documentation effort.
+
+We use a review-then-commit workflow in Pulsar for all contributions.
+
+**For larger contributions or those that affect multiple components:**
+
+1. **Engage**: We encourage you to work with the Pulsar community on the
+   [Github Issues](https://github.com/apache/incubator-pulsar/issues) and
+   [developer’s mailing list](/contact) to identify
+   good areas for contribution.
+1. **Design:** More complicated contributions will likely benefit from some 
early discussion in
+   order to scope and design them well.
+
+**For all contributions:**
+
+1. **Code:** The best part ;-)
+1. **Review:** Submit a pull request with your contribution to our
+   [GitHub Repo](https://github.com/apache/incubator-pulsar). Work with a 
committer to review and
+   iterate on the code, if needed.
+1. **Commit:** Once at least 2 Pulsar committers have approved the pull 
request, a Pulsar committer
+will merge it into the master branch (and potentially backport to stable 
branches in case of
+bug fixes).
+
+We look forward to working with you!
+
+## Engage
+
+### Mailing list(s)
+
+We discuss design and implementation issues on the 
[d...@pulsar.incubator.apache.org](mailto:d...@pulsar.incubator.apache.org)
+mailing list, which is archived 
[here](https://lists.apache.org/list.html?d...@pulsar.apache.org).
+Join by emailing 
[`dev-subscr...@pulsar.incubator.apache.org`](mailto:dev-subscr...@pulsar.incubator.apache.org).
+
+If interested, you can also join the other [mailing lists](/contact).
+
+### Github Issues
+
+We are using [Github 
Issues](https://github.com/apache/incubator-pulsar/issues) as the issue tracking
+and project management tool, as well as a way to communicate among a very 
diverse and distributed set
+of contributors. To be able to gather feedback, avoid frustration, and avoid 
duplicated efforts all
+Pulsar related work are being tracked there.
+
+If you do not already have an Github account, sign up 
[here](https://github.com/join).
+
+If a quick [search](https://github.com/apache/incubator-pulsar/issues) doesn’t 
turn up an existing
+Github issue for the work you want to contribute, create it. Please discuss 
your idea with a
+committer in Github or, alternatively, on the developer mailing list.
+
+If there’s an existing Github issue for your intended contribution, please 
comment about your intended
+work. Once the work is understood, a committer will assign the issue to you. 
If an issue is currently
+assigned, please check with the current assignee before reassigning.
+
+For moderate or large contributions, you should not start coding or writing a 
design document unless
+there is a corresponding Github issue assigned to you for that work. Simple 
changes, like fixing typos,
+do not require an associated issue.
+
+### Online discussions
+
+We are using [Apache Pulsar Slack channel](https://apache-pulsar.slack.com/) 
for online discussions.
+You can self-invite yourself by accessing [this 
link](https://apache-pulsar.herokuapp.com/).
+
+Slack channels are great for quick questions or discussions on specialized 
topics. Remember that we
+strongly encourage communication via the mailing lists, and we prefer to 
discuss more complex subjects
+by email. Developers should be careful to move or duplicate all the official 
or useful discussions to
+the issue tracking system and/or the dev mailing list.
+
+## Design
+
+To avoid potential frustration during the code review cycle, we encourage you 
to clearly scope and
+design non-trivial contributions with the BookKeeper community before you 
start coding.
 
 Review comment:
   Pulsar community


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2459: Added Contributing page on website

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2459: Added Contributing page on 
website
URL: https://github.com/apache/incubator-pulsar/pull/2459#discussion_r213187066
 
 

 ##
 File path: site2/website/contributing.md
 ##
 @@ -0,0 +1,250 @@
+
+The Apache Pulsar community welcomes contributions from anyone with a passion 
for distributed systems! Pulsar has many different opportunities for 
contributions --
+write new examples/tutorials, add new user-facing libraries, write new Pulsar 
IO connectors, or participate on the documentation effort.
+
+We use a review-then-commit workflow in Pulsar for all contributions.
+
+**For larger contributions or those that affect multiple components:**
+
+1. **Engage**: We encourage you to work with the Pulsar community on the
+   [Github Issues](https://github.com/apache/incubator-pulsar/issues) and
+   [developer’s mailing list](/contact) to identify
+   good areas for contribution.
+1. **Design:** More complicated contributions will likely benefit from some 
early discussion in
+   order to scope and design them well.
+
+**For all contributions:**
+
+1. **Code:** The best part ;-)
+1. **Review:** Submit a pull request with your contribution to our
+   [GitHub Repo](https://github.com/apache/incubator-pulsar). Work with a 
committer to review and
+   iterate on the code, if needed.
+1. **Commit:** Once at least 2 Pulsar committers have approved the pull 
request, a Pulsar committer
+will merge it into the master branch (and potentially backport to stable 
branches in case of
+bug fixes).
+
+We look forward to working with you!
+
+## Engage
+
+### Mailing list(s)
+
+We discuss design and implementation issues on the 
[d...@pulsar.incubator.apache.org](mailto:d...@pulsar.incubator.apache.org)
+mailing list, which is archived 
[here](https://lists.apache.org/list.html?d...@pulsar.apache.org).
+Join by emailing 
[`dev-subscr...@pulsar.incubator.apache.org`](mailto:dev-subscr...@pulsar.incubator.apache.org).
+
+If interested, you can also join the other [mailing lists](/contact).
+
+### Github Issues
+
+We are using [Github 
Issues](https://github.com/apache/incubator-pulsar/issues) as the issue tracking
+and project management tool, as well as a way to communicate among a very 
diverse and distributed set
+of contributors. To be able to gather feedback, avoid frustration, and avoid 
duplicated efforts all
+Pulsar related work are being tracked there.
+
+If you do not already have an Github account, sign up 
[here](https://github.com/join).
+
+If a quick [search](https://github.com/apache/incubator-pulsar/issues) doesn’t 
turn up an existing
+Github issue for the work you want to contribute, create it. Please discuss 
your idea with a
+committer in Github or, alternatively, on the developer mailing list.
+
+If there’s an existing Github issue for your intended contribution, please 
comment about your intended
+work. Once the work is understood, a committer will assign the issue to you. 
If an issue is currently
+assigned, please check with the current assignee before reassigning.
+
+For moderate or large contributions, you should not start coding or writing a 
design document unless
+there is a corresponding Github issue assigned to you for that work. Simple 
changes, like fixing typos,
+do not require an associated issue.
+
+### Online discussions
+
+We are using [Apache Pulsar Slack channel](https://apache-pulsar.slack.com/) 
for online discussions.
+You can self-invite yourself by accessing [this 
link](https://apache-pulsar.herokuapp.com/).
+
+Slack channels are great for quick questions or discussions on specialized 
topics. Remember that we
+strongly encourage communication via the mailing lists, and we prefer to 
discuss more complex subjects
+by email. Developers should be careful to move or duplicate all the official 
or useful discussions to
+the issue tracking system and/or the dev mailing list.
+
+## Design
+
+To avoid potential frustration during the code review cycle, we encourage you 
to clearly scope and
+design non-trivial contributions with the BookKeeper community before you 
start coding.
+
+We are using "Pulsar Improvement Proposals" (or "PIP") for managing major 
changes to Pulsar. The
+list of all PIPs is maintained in the Pulsar wiki at 
https://github.com/apache/incubator-pulsar/wiki.
+
+## Code
+
+To contribute code to Apache Pulsar, you’ll have to do a few administrative 
steps once, and then
+follow the [Coding Guide](../coding_guide).
+
+### One-time Setup
+
+ [Optionally] Submit Contributor License Agreement
+
+Apache Software Foundation (ASF) desires that all contributors of ideas, code, 
or documentation to the Apache projects complete, sign, and submit an 
[Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.pdf) (ICLA). The purpose of 
this agreement is to clearly define the terms under which intellectual property 
has been contributed to the ASF and thereby allow us to defend the 

[incubator-pulsar] branch master updated: [Schema] Provide a generic record interface for representing a typed message (#2452)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 757222d  [Schema] Provide a generic record interface for representing 
a typed message (#2452)
757222d is described below

commit 757222d9d232dbe03293df890609aa85ff63556c
Author: Sijie Guo 
AuthorDate: Mon Aug 27 22:21:27 2018 -0700

[Schema] Provide a generic record interface for representing a typed 
message (#2452)

* [Schema] Provide a generic record interface for representing a typed 
message

 ### Motivation

In some use cases, the publishers and consumers don't know the type or 
schema of the messages ahead of time.
For example, in pulsar io connector, when connecting a topic to a jdbc 
table, the connector doesn't know
the tyep of the messages ahead of time; the connector can only fetch schema 
info from schema registry and
that is the only information connector knows. It is impossible for mapping 
the messages to a relational database
table.

So we need a way to present a generic `Struct` record with fields.

 ### Changes

Introduce `Field` and `GenericRecord` to represent `Struct` records 
deserialized with a schema.

 ### NotCovered

This change only introduces the interfaces. It doesn't integrate with the 
producer and consumer workflow.
That would be done in subsequent changes if we agree on the interfaces.
---
 .../org/apache/pulsar/client/api/schema/Field.java |  43 +
 .../pulsar/client/api/schema/GenericRecord.java|  51 +++
 .../pulsar/client/impl/schema/AvroSchema.java  |  17 ++--
 .../client/impl/schema/GenericAvroRecord.java  |  79 
 .../client/impl/schema/GenericAvroSchema.java  | 102 +
 .../pulsar/client/schema/AvroSchemaTest.java   |  47 +-
 6 files changed, 329 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
new file mode 100644
index 000..653b5d6
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+/**
+ * The field name.
+ */
+private final String name;
+/**
+ * The index of the field within the record.
+ */
+private final int index;
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
new file mode 100644
index 000..0a4fce4
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;

[GitHub] sijie closed pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie closed pull request #2452: [Schema] Provide a generic record interface 
for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
new file mode 100644
index 00..653b5d6090
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+/**
+ * The field name.
+ */
+private final String name;
+/**
+ * The index of the field within the record.
+ */
+private final int index;
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
new file mode 100644
index 00..0a4fce43cb
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+/**
+ * Returns the list of fields associated with the record.
+ *
+ * @return the list of fields associated with the record.
+ */
+List getFields();
+
+/**
+ * Retrieve the value of the provided field.
+ *
+ * @param field the field to retrieve the value
+ * @return the value object
+ */
+Object getField(Field field);
+
+/**
+ * Retrieve the value of the provided fieldName.
+ *
+ * @param fieldName the field name
+ * @return the value object
+ */
+Object getField(String fieldName);
+
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 7d90d2bf69..6867fdcb4d 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -45,8 +45,9 @@
 private BinaryEncoder encoder;
 private ByteArrayOutputStream byteArrayOutputStream;
 
-private AvroSchema(Class pojo, Map properties) {
-this.schema = ReflectData.AllowNull.get().getSchema(pojo);
+private AvroSchema(org.apache.avro.Schema schema,
+   Map properties) {
+this.schema = schema;
 
 this.schemaInfo = new SchemaInfo();
 this.schemaInfo.setName("");
@@ -61,8 +62,7 @@ private AvroSchema(Class 

[GitHub] merlimat commented on a change in pull request #2460: Add backlog quota retention policy to server config.

2018-08-27 Thread GitBox
merlimat commented on a change in pull request #2460: Add backlog quota 
retention policy to server config.
URL: https://github.com/apache/incubator-pulsar/pull/2460#discussion_r213179479
 
 

 ##
 File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 ##
 @@ -86,6 +86,11 @@
 private int backlogQuotaCheckIntervalInSeconds = 60;
 // Default per-topic backlog quota limit
 private long backlogQuotaDefaultLimitGB = 50;
+//Default backlog quota retention policy. Default is producer_request_hold
+//'producer_request_hold' Policy which holds producer's send request until 
the resource becomes available (or holding times out)
+//'producer_exception' Policy which throws 
javax.jms.ResourceAllocationException to the producer
+//'consumer_backlog_eviction' Policy which evicts the oldest message from 
the slowest consumer's backlog
+private String backlogQuotaRetentionPolicy = "producer_request_hold";
 
 Review comment:
   This could be set with the enum so that we can validate the value when 
loading the config file.
   
   Also, similar to the `backlogQuotaDefaultLimitGB` this config var could be 
called `backlogQuotaDefaultRetentionPolicy`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] codelipenghui opened a new pull request #2460: Add backlog quota retention policy to server config.

2018-08-27 Thread GitBox
codelipenghui opened a new pull request #2460: Add backlog quota retention 
policy to server config.
URL: https://github.com/apache/incubator-pulsar/pull/2460
 
 
   ### Motivation
   
   Enable setting default backlog quota retention from server config 
file(broker.conf)
   
   ### Modifications
   
   Add backlogQuotaRetentionPolicy to broker.conf
   Add backlogQuotaRetentionPolicy to ServiceConfiguration
   Modify BacklogQuotaManager create logic, when backlogQuotaRetentionPolicy in 
ServiceConfiguration is not null, BacklogQuotaManager will use the 
ServiceConfiguration's backlogQuotaRetentionPolicy.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages

2018-08-27 Thread GitBox
merlimat commented on issue #2455: Fixed {{pulsar.version}} into 
{{pulsar:version}} in few pages
URL: https://github.com/apache/incubator-pulsar/pull/2455#issuecomment-416441172
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
merlimat commented on issue #2452: [Schema] Provide a generic record interface 
for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#issuecomment-416441285
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics

2018-08-27 Thread GitBox
rdhabalia commented on issue #2438: Fix: Function assignment can support large 
number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416422769
 
 
   rerun cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar.wiki] branch master updated: Created Apache Maturity Model Assessment for Pulsar (markdown)

2018-08-27 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new b4cacd5  Created Apache Maturity Model Assessment for Pulsar (markdown)
b4cacd5 is described below

commit b4cacd5edd84de42d9b6c89bf6155f27d258b56f
Author: Matteo Merli 
AuthorDate: Mon Aug 27 18:11:05 2018 -0700

Created Apache Maturity Model Assessment for Pulsar (markdown)
---
 Apache-Maturity-Model-Assessment-for-Pulsar.md | 40 ++
 1 file changed, 40 insertions(+)

diff --git a/Apache-Maturity-Model-Assessment-for-Pulsar.md 
b/Apache-Maturity-Model-Assessment-for-Pulsar.md
new file mode 100644
index 000..2074b03
--- /dev/null
+++ b/Apache-Maturity-Model-Assessment-for-Pulsar.md
@@ -0,0 +1,40 @@
+| ID | Description | Status |
+|:---|:|:---|
+| ***Code*** | | |
+| CD10 | The project produces Open Source software, for distribution to 
the public at no charge. | **YES** The project source code is licensed under 
the Apache License, version 2.0.  |
+| CD20 | The project's code is easily discoverable and publicly 
accessible. | **YES** Linked from the website, available via GitBox  
https://gitbox.apache.org/repos/asf?p=incubator-pulsar.git and 
https://github.com/apache/incubator-pulsar. |
+| CD30 | The code can be built in a reproducible way using widely 
available standard tools. | **YES** The build uses Apache Maven for Java code 
and CMake for C++ code. Continuous integration is used |
+| CD40 | The full history of the project's code is available via a source 
code control system, in a way that allows any released version to be recreated. 
| **YES**  All the history of the project is available through Git. All 
releases are properly tagged. |
+| CD50 | The provenance of each line of code is established via the source 
code control system, in a reliable way based on strong authentication of the 
committer. When third-party contributions are committed, commit messages 
provide reliable information about the code provenance.  | **YES** The git 
repository is managed by Apache Infra. Only Pulsar committers have write 
access. All code is checked in after at least 2 committers have approved a 
pull-request. For 3rd party contribution [...]
+| ***Licenses and Copyright*** | | |
+| LC10 | The code is released under the Apache License, version 2.0. | 
**YES** Source distributions clearly state license. Convenience binaries 
clearly state license. |
+| LC20 | Libraries that are mandatory dependencies of the project's code 
do not create more restrictions than the Apache License does. | **YES** The 
list of mandatory dependencies have been reviewed to contain approved licenses 
only.|
+| LC30 | The libraries mentioned in LC20 are available as Open Source 
software. | **YES** All dependencies are available as open source software. |
+| LC40 | Committers are bound by an Individual Contributor Agreement (the 
"Apache iCLA") that defines which code they are allowed to commit and how they 
need to identify code that is not their own. | **YES** The project uses a 
repository managed by Apache Infra  write access requires an Apache 
account, which requires an ICLA on file. |
+| LC50 | The copyright ownership of everything that the project produces 
is clearly defined and documented. | **YES** Software Grant Agreement for the 
initial donation from Yahoo was filed. All files in the source repository have 
appropriate headers. Automated process is in place to ensure every file has 
expected headers. |
+| ***Releases*** | | |
+| RE10 | Releases consist of source code, distributed using standard and 
open archive formats that are expected to stay readable in the long term. | 
**YES** Source releases are distributed via 
https://dist.apache.org/repos/dist/release/incubator/pulsar/ and linked from 
the website at https://pulsar.incubator.apache.org/download/.
+| RE20 | Releases are approved by the project's PMC (see CS10), in order 
to make them an act of the Foundation.| **YES** All incubating releases have 
been approved by the Pulsar community with at least 3 PPMC votes and from the 
Incubator with 3 IPMC votes. |
+| RE30 | Releases are signed and/or distributed along with digests that 
can be reliably used to validate the downloaded archives. | **YES** All 
releases are signed, and the KEYS file is provided on dist.apache.org. |
+| RE40 | Convenience binaries can be distributed alongside source code but 
they are not Apache Releases -- they are just a convenience provided with no 
guarantee. | **YES** Convenience binaries are distributed via via 
dist.apache.org. Java binary artifacts are also distributed through Maven 
Central Repository. |
+| RE50 | The release process is documented and repeatable to the extent 
that someone new to 

[GitHub] merlimat opened a new pull request #2459: Added Contributing page on website

2018-08-27 Thread GitBox
merlimat opened a new pull request #2459: Added Contributing page on website
URL: https://github.com/apache/incubator-pulsar/pull/2459
 
 
   ### Motivation
   
   Added Contributing page on website. This is inspired and adapted from 
BookKeeper contributing guide 
http://bookkeeper.apache.org/community/contributing/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Remove schema validator from annotations since it requires slightly more information (#2445)

2018-08-27 Thread sanjeevrk
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b234f33  Remove schema validator from annotations since it requires 
slightly more information (#2445)
b234f33 is described below

commit b234f330ec6600c7ffd2201d4360182a5bd4d0b7
Author: Sanjeev Kulkarni 
AuthorDate: Mon Aug 27 17:42:35 2018 -0700

Remove schema validator from annotations since it requires slightly more 
information (#2445)

and is therefore done explitcly
---
 .../main/java/org/apache/pulsar/functions/utils/FunctionConfig.java| 3 ---
 .../src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java| 3 +--
 .../src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java  | 1 -
 3 files changed, 1 insertion(+), 6 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 63d8dd8..ac96fa9 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,7 +29,6 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
@@ -82,8 +81,6 @@ public class FunctionConfig {
 private Map customSerdeInputs;
 @isValidTopicName
 private String topicsPattern;
-@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
-valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }, 
targetRuntime = ConfigValidation.Runtime.JAVA)
 @isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class }, targetRuntime = 
ConfigValidation.Runtime.PYTHON)
 private Map customSchemaInputs;
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 6eb96c3..be886c4 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -66,8 +66,7 @@ public class SinkConfig {
 @isValidTopicName
 private String topicsPattern;
 
-@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
-valueValidatorClasses = { ValidatorImpls.SchemaValidator.class })
+@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class })
 private Map topicToSchemaType;
 
 private Map inputSpecs = new TreeMap<>();
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index 5807b40..38e200a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -58,7 +58,6 @@ public class SourceConfig {
 @isImplementationOfClass(implementsClass = SerDe.class)
 private String serdeClassName;
 
-@isImplementationOfClass(implementsClass = Schema.class)
 private String schemaType;
 
 private Map configs;



[GitHub] srkukarni closed pull request #2445: Remove schema validator from annotations

2018-08-27 Thread GitBox
srkukarni closed pull request #2445: Remove schema validator from annotations
URL: https://github.com/apache/incubator-pulsar/pull/2445
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 63d8dd847a..ac96fa90a7 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,7 +29,6 @@
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
@@ -82,8 +81,6 @@
 private Map customSerdeInputs;
 @isValidTopicName
 private String topicsPattern;
-@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
-valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }, 
targetRuntime = ConfigValidation.Runtime.JAVA)
 @isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class }, targetRuntime = 
ConfigValidation.Runtime.PYTHON)
 private Map customSchemaInputs;
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 6eb96c39e0..be886c41ae 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -66,8 +66,7 @@
 @isValidTopicName
 private String topicsPattern;
 
-@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
-valueValidatorClasses = { ValidatorImpls.SchemaValidator.class })
+@isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class })
 private Map topicToSchemaType;
 
 private Map inputSpecs = new TreeMap<>();
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
index 5807b403d6..38e200aefb 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java
@@ -58,7 +58,6 @@
 @isImplementationOfClass(implementsClass = SerDe.class)
 private String serdeClassName;
 
-@isImplementationOfClass(implementsClass = Schema.class)
 private String schemaType;
 
 private Map configs;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit commented on issue #2457: Updated documentation about changed arguments for functions, sources and sinks

2018-08-27 Thread GitBox
asfgit commented on issue #2457: Updated documentation about changed arguments 
for functions, sources and sinks
URL: https://github.com/apache/incubator-pulsar/pull/2457#issuecomment-416409829
 
 
   SUCCESS 

   --none--


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit commented on issue #2453: Fixed cpp multi-topic consumer when topics are not partitioned

2018-08-27 Thread GitBox
asfgit commented on issue #2453: Fixed cpp multi-topic consumer when topics are 
not partitioned
URL: https://github.com/apache/incubator-pulsar/pull/2453#issuecomment-416408967
 
 
   SUCCESS 

   --none--


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213151503
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 ##
 @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
 .withName(kafkaServiceName)
 .withHostName(clusterName + "-" + kafkaServiceName)));
+
 final String cassandraServiceName = "cassandra";
 externalServices.put(
 cassandraServiceName,
 new CassandraContainer(clusterName));
+
+// use mySQL for jdbc test
+final String jdbcServiceName = "jdbc";
+externalServices.put(
+jdbcServiceName,
+new MySQLContainer()
 
 Review comment:
   Thanks, this is already done in PulsarCluster


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213151155
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
 
 Review comment:
   Thanks, we may better tested before add more types, fail early is better. 
For new type, we should also add dependency in pom file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia removed a comment on issue #2438: Fix: Function assignment can support large number of topics

2018-08-27 Thread GitBox
rdhabalia removed a comment on issue #2438: Fix: Function assignment can 
support large number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416407418
 
 
   rerun cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics

2018-08-27 Thread GitBox
rdhabalia commented on issue #2438: Fix: Function assignment can support large 
number of topics
URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416407418
 
 
   rerun cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2458: Fixed few links in contact and download pages

2018-08-27 Thread GitBox
merlimat opened a new pull request #2458: Fixed few links in contact and 
download pages
URL: https://github.com/apache/incubator-pulsar/pull/2458
 
 
   ### Motivation
   
   Fixed few links in contact and download pages


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213150850
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] srkukarni opened a new pull request #2457: Updated documentation about changed arguments for functions, sources and sinks

2018-08-27 Thread GitBox
srkukarni opened a new pull request #2457: Updated documentation about changed 
arguments for functions, sources and sinks
URL: https://github.com/apache/incubator-pulsar/pull/2457
 
 
   
   ### Motivation
   
   We recently changed the argument style of functions/sources and sinks to 
match the rest of pulsar cli. This pr updates the documentation of the same
   ### Modifications
   
   Describe the modifications you've done.
   
   ### Result
   
   After your change, what will change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2448: Add multi-topic and regex consumer in Go client

2018-08-27 Thread GitBox
merlimat closed pull request #2448: Add multi-topic and regex consumer in Go 
client
URL: https://github.com/apache/incubator-pulsar/pull/2448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/c/client.h 
b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6de92..4b603bbff3 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void 
*ctx);
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+const char *subscriptionName,
+const 
pulsar_consumer_configuration_t *conf,
+pulsar_subscribe_callback 
callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+   const char *subscriptionName,
+   const 
pulsar_consumer_configuration_t *conf,
+   pulsar_subscribe_callback callback, 
void *ctx);
+
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading 
messages from the specified
  * topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc 
b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e41067f..ecefe20c03 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
boost::bind(_subscribe_callback, _1, 
_2, callback, ctx));
 }
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+const char *subscriptionName,
+const 
pulsar_consumer_configuration_t *conf,
+pulsar_subscribe_callback 
callback, void *ctx) {
+std::vector topicsList;
+for (int i = 0; i < topicsCount; i++) {
+topicsList.push_back(topics[i]);
+}
+
+client->client->subscribeAsync(topicsList, subscriptionName, 
conf->consumerConfiguration,
+   boost::bind(_subscribe_callback, _1, 
_2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+   const char *subscriptionName,
+   const 
pulsar_consumer_configuration_t *conf,
+   pulsar_subscribe_callback callback, 
void *ctx) {
+client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, 
conf->consumerConfiguration,
+
boost::bind(_subscribe_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char 
*topic,
   const pulsar_message_id_t 
*startMessageId,
   pulsar_reader_configuration_t *conf, 
pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index aebabc877d..093dd9dc6e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
 import "C"
 
 import (
+   "context"
"runtime"
"time"
"unsafe"
-   "context"
 )
 
 type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
 }
 
 func subscribeAsync(client *client, options ConsumerOptions, callback 
func(Consumer, error)) {
-   if options.Topic == "" {
+   if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required"))
return
}
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_consumer_name(conf, name)
}
 
-   topic := C.CString(options.Topic)
subName := C.CString(options.SubscriptionName)
-   defer C.free(unsafe.Pointer(topic))
defer C.free(unsafe.Pointer(subName))
-   C._pulsar_client_subscribe_async(client.ptr, topic, 

[GitHub] merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages

2018-08-27 Thread GitBox
merlimat commented on issue #2455: Fixed {{pulsar.version}} into 
{{pulsar:version}} in few pages
URL: https://github.com/apache/incubator-pulsar/pull/2455#issuecomment-416401591
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138309
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
 
 Review comment:
   nit: log.info("Open jdbc connection : {}", jdbcUrl);


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139143
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139185
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139561
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
+}
+}
+
+/**
+ * Get the {@link Connection} for the given jdbcUrl.
+ */
+public static Connection getConnection(String jdbcUrl, Properties 
properties) throws Exception {
+String driver = jdbcUrl.split(":")[1];
+String driverClassName = getDriverClassName(driver);
+Class.forName(driverClassName);
+
+return DriverManager.getConnection(jdbcUrl, properties);
+}
+
+/**
+ * Get the {@link TableId} for the given tableName.
+ */
+public static TableId getTableId(Connection connection, String tableName) 
throws Exception {
+DatabaseMetaData metadata = connection.getMetaData();
+try (ResultSet rs = metadata.getTables(null, null, tableName, new 
String[]{"TABLE"})) {
+if (rs.next()) {
+String catalogName = rs.getString(1);
+String schemaName = rs.getString(2);
+String gotTableName = rs.getString(3);
+checkState(tableName.equals(gotTableName),
+"TableName not match: " + tableName + " Got: " + 
gotTableName);
+log.debug("Get Table: {}, {}, {}", catalogName, schemaName, 
tableName);
 
 Review comment:
   if (log.isDebugEnabled())


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140305
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 ##
 @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
 .withName(kafkaServiceName)
 .withHostName(clusterName + "-" + kafkaServiceName)));
+
 final String cassandraServiceName = "cassandra";
 externalServices.put(
 cassandraServiceName,
 new CassandraContainer(clusterName));
+
+// use mySQL for jdbc test
+final String jdbcServiceName = "jdbc";
+externalServices.put(
+jdbcServiceName,
+new MySQLContainer()
 
 Review comment:
   attach clusterName to the hostname
   
   ```
   new MySQLContainer()
.withNetworkAliases(jdbcServiceName)
.withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
   .withName(jdbcServiceName)
   .withHostName(clusterName + "-" + jdbcServiceName)));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138387
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
 
 Review comment:
   nit: log.info("Closed jdbc connection : {}", jdbcUrl);


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139604
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
+}
+}
+
+/**
+ * Get the {@link Connection} for the given jdbcUrl.
+ */
+public static Connection getConnection(String jdbcUrl, Properties 
properties) throws Exception {
+String driver = jdbcUrl.split(":")[1];
+String driverClassName = getDriverClassName(driver);
+Class.forName(driverClassName);
+
+return DriverManager.getConnection(jdbcUrl, properties);
+}
+
+/**
+ * Get the {@link TableId} for the given tableName.
+ */
+public static TableId getTableId(Connection connection, String tableName) 
throws Exception {
+DatabaseMetaData metadata = connection.getMetaData();
+try (ResultSet rs = metadata.getTables(null, null, tableName, new 
String[]{"TABLE"})) {
+if (rs.next()) {
+String catalogName = rs.getString(1);
+String schemaName = rs.getString(2);
+String gotTableName = rs.getString(3);
+checkState(tableName.equals(gotTableName),
+"TableName not match: " + tableName + " Got: " + 
gotTableName);
+log.debug("Get Table: {}, {}, {}", catalogName, schemaName, 
tableName);
+return TableId.of(catalogName, schemaName, tableName);
+} else {
+throw new Exception("Not able to find table: " + tableName);
+}
+}
+}
+
+/**
+ * Get the {@link TableDefinition} for the given table.
+ */
+public static TableDefinition getTableDefinition(Connection connection, 
TableId tableId) throws Exception {
+TableDefinition table = TableDefinition.of(tableId, 
Lists.newArrayList());
+
+try (ResultSet rs = connection.getMetaData().getColumns(
+tableId.getCatalogName(),
+tableId.getSchemaName(),
+ 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139256
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139527
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
 ##
 @@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableId {
+private final String catalogName;
+private final String schemaName;
+private final String tableName;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class ColumnId {
+private final TableId tableId;
+private final String name;
+// SQL type from java.sql.Types
+private final int type;
+private final String typeName;
+// column position in table
+private final int position;
+}
+
+@Data(staticConstructor = "of")
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+public static class TableDefinition {
+private final TableId tableId;
+private final List columns;
+}
+
+/**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types
+ */
+public static String getDriverClassName(String driver) throws Exception {
+if (driver.equals("mysql")) {
+return "com.mysql.jdbc.Driver";
+} if (driver.equals("sqlite")) {
+return "org.sqlite.JDBC";
+} else {
+throw new Exception("Not tested jdbc driver type: " + driver);
 
 Review comment:
   probably log a warning message rather than failing it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138978
 
 

 ##
 File path: 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 ##
 @@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink implements Sink {
+// - Runtime fields
+private JdbcSinkConfig jdbcSinkConfig;
+@Getter
+private Connection connection;
+private String tableName;
+
+private JdbcUtils.TableId tableId;
+private PreparedStatement insertStatement;
+
+// TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+protected String schema;
+protected JdbcUtils.TableDefinition tableDefinition;
+
+// for flush
+private List> incomingList;
+private List> swapList;
+private AtomicBoolean isFlushing;
+private int timeoutMs;
+private int batchSize;
+private ScheduledExecutorService flushExecutor;
+
+@Override
+public void open(Map config, SinkContext sinkContext) 
throws Exception {
+jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+String jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+if (jdbcSinkConfig.getJdbcUrl() == null) {
+throw new IllegalArgumentException("Required jdbc Url not set.");
+}
+
+Properties properties = new Properties();
+String username = jdbcSinkConfig.getUserName();
+String password = jdbcSinkConfig.getPassword();
+if (username != null) {
+properties.setProperty("user", username);
+}
+if (password != null) {
+properties.setProperty("password", password);
+}
+
+connection = JdbcUtils.getConnection(jdbcUrl, properties);
+log.info("Connection opened");
+
+schema = jdbcSinkConfig.getSchema();
+tableName = jdbcSinkConfig.getTableName();
+tableId = JdbcUtils.getTableId(connection, tableName);
+tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+timeoutMs = jdbcSinkConfig.getTimeoutMs();
+batchSize = jdbcSinkConfig.getBatchSize();
+incomingList = Lists.newArrayList();
+swapList = Lists.newArrayList();
+isFlushing = new AtomicBoolean(false);
+
+flushExecutor = Executors.newScheduledThreadPool(1);
+flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void close() throws Exception {
+if (!connection.getAutoCommit()) {
+connection.commit();
+}
+flushExecutor.shutdown();
+if (connection != null) {
+connection.close();
+}
+log.info("Connection Closed");
+}
+
+@Override
+public void write(Record record) throws Exception {
+int number;
+synchronized (incomingList) {
+incomingList.add(record);
+number = incomingList.size();
+}
+
+if (number == batchSize) {
+flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+}
+}
+
+// bind value with a 

[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC 
sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140171
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 ##
 @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
 .withName(kafkaServiceName)
 .withHostName(clusterName + "-" + kafkaServiceName)));
+
 final String cassandraServiceName = "cassandra";
 externalServices.put(
 cassandraServiceName,
 new CassandraContainer(clusterName));
+
+// use mySQL for jdbc test
+final String jdbcServiceName = "jdbc";
 
 Review comment:
   jdbcServiceName = "mysql"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2406: Fix flake in DiscoveryServiceTest (#1081)

2018-08-27 Thread GitBox
sijie commented on issue #2406: Fix flake in DiscoveryServiceTest (#1081)
URL: https://github.com/apache/incubator-pulsar/pull/2406#issuecomment-416392332
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context

2018-08-27 Thread GitBox
sijie commented on issue #2411: [functions] change instance id from string to 
int and expose number of instances in context
URL: https://github.com/apache/incubator-pulsar/pull/2411#issuecomment-416392265
 
 
   take this out of 2.2. will revisit this for 2.3


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aahmed-se opened a new pull request #2456: Remove Aerospike Connector

2018-08-27 Thread GitBox
aahmed-se opened a new pull request #2456: Remove Aerospike Connector
URL: https://github.com/apache/incubator-pulsar/pull/2456
 
 
   Remove Aerospike connector due to license considerations of dependent 
libraries.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages

2018-08-27 Thread GitBox
merlimat opened a new pull request #2455: Fixed {{pulsar.version}} into 
{{pulsar:version}} in few pages
URL: https://github.com/apache/incubator-pulsar/pull/2455
 
 
   ### Motivation
   
   In few places in website we're displaying the literal `{{pulsar.version}}` 
instead of the actual Pulsar version string. The correct variable name is 
expected to use `:` as separator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie commented on issue #2452: [Schema] Provide a generic record interface for 
representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#issuecomment-416346124
 
 
   @merlimat I have addressed your comments. PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2452: [Schema] Provide a generic 
record interface for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213092862
 
 

 ##
 File path: 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
 ##
 @@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+private final String name;
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie opened a new pull request #2454: Fix compilation issue on CLITest

2018-08-27 Thread GitBox
sijie opened a new pull request #2454: Fix compilation issue on CLITest
URL: https://github.com/apache/incubator-pulsar/pull/2454
 
 
### Motivation
   
#2182 is cherry-picked into branch 2.1. But some of integration test 
changes are not cherry-picked back.
So the compilation is broken
   
### Changes
   
Fix the compilation issue on CLITest
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
merlimat commented on a change in pull request #2452: [Schema] Provide a 
generic record interface for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213078989
 
 

 ##
 File path: 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 ##
 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+/**
+ * Returns the list of fields associated with the record.
+ *
+ * @return the list of fields associated with the record.
+ */
+List getFields();
 
 Review comment:
   Sure, we can do the minimum required by JDBC now and continue later. Only to 
make sure the API can gracefully evolve later when adding nested objects.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty

2018-08-27 Thread GitBox
sijie commented on issue #2372: Kinesis-sink consider topic-name as 
partition-key if record key empty
URL: https://github.com/apache/incubator-pulsar/pull/2372#issuecomment-416328437
 
 
   actually this change is related bunch of other changes. so not going to 
merge it to branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: Revert "Kinesis-sink consider topic-name as partition-key if record key empty (#2372)"

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new 9ede4c8  Revert "Kinesis-sink consider topic-name as partition-key if 
record key empty (#2372)"
9ede4c8 is described below

commit 9ede4c80802626065f70fb5e20b81bafb6f33969
Author: Sijie Guo 
AuthorDate: Mon Aug 27 11:47:56 2018 -0700

Revert "Kinesis-sink consider topic-name as partition-key if record key 
empty (#2372)"

This reverts commit 703305b5426856bab7bab30a41e4f242e7782dc7.
---
 .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index cdfadde..67de21a 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -114,7 +114,7 @@ public class KinesisSink implements Sink {
 record.getRecordSequence());
 throw new IllegalStateException("kinesis queue has publish 
failure");
 }
-String partitionedKey = 
record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
+String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
 partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256



[GitHub] srkukarni commented on issue #2451: Issue 2312: add python client multi-topics consumer support

2018-08-27 Thread GitBox
srkukarni commented on issue #2451: Issue 2312: add python client multi-topics 
consumer support
URL: https://github.com/apache/incubator-pulsar/pull/2451#issuecomment-416327192
 
 
   Once #2448 is in, I will add support for functions as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2450: Added Reader.HasNext in Go client

2018-08-27 Thread GitBox
sijie commented on issue #2450: Added Reader.HasNext in Go client
URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416326440
 
 
   merged as b6dc7c1 in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: Added Reader.HasNext in Go client (#2450)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new b6dc7c1  Added Reader.HasNext in Go client (#2450)
b6dc7c1 is described below

commit b6dc7c1eae314e65cf782a7b0201408279dc9e8d
Author: Matteo Merli 
AuthorDate: Mon Aug 27 11:41:24 2018 -0700

Added Reader.HasNext in Go client (#2450)

### Motivation

Added `Reader.HasNext()` in Go client library
---
 pulsar-client-go/pulsar/c_reader.go| 13 +
 pulsar-client-go/pulsar/reader.go  |  3 +++
 pulsar-client-go/pulsar/reader_test.go | 12 ++--
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 730f9b8..12c1103 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
}
 }
 
+func (r *reader) HasNext() (bool, error) {
+   value := C.int(0)
+   res := C.pulsar_reader_has_message_available(r.ptr, )
+
+   if res != C.pulsar_result_Ok {
+   return false, newError(res, "Failed to check if next message is 
available")
+   } else if value == C.int(1) {
+   return true, nil
+   } else {
+   return false, nil
+   }
+}
+
 func (r *reader) Close() error {
channel := make(chan error)
r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index f61ebd7..7015c9c 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
// Read the next message in the topic, blocking until a message is 
available
Next(context.Context) (Message, error)
 
+   // Check if there is any message available to read from the current 
position
+   HasNext() (bool, error)
+
// Close the reader and stop the broker to push more messages
Close() error
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index 11d1b36..3b075e1 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-   "testing"
-   "fmt"
"context"
+   "fmt"
+   "testing"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
 
+   hasNext, err := reader.HasNext()
+   assertNil(t, err)
+   assertEqual(t, hasNext, true)
+
msg, err := reader.Next(ctx)
assertNil(t, err)
assertNotNil(t, msg)
 
assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
}
+
+   hasNext, err := reader.HasNext()
+   assertNil(t, err)
+   assertEqual(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {



[incubator-pulsar] branch master updated: Added Reader.HasNext in Go client (#2450)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 73bc19d  Added Reader.HasNext in Go client (#2450)
73bc19d is described below

commit 73bc19dc41b6a06bd7051216682cc5d2fc5d5592
Author: Matteo Merli 
AuthorDate: Mon Aug 27 11:41:24 2018 -0700

Added Reader.HasNext in Go client (#2450)

### Motivation

Added `Reader.HasNext()` in Go client library
---
 pulsar-client-go/pulsar/c_reader.go| 13 +
 pulsar-client-go/pulsar/reader.go  |  3 +++
 pulsar-client-go/pulsar/reader_test.go | 12 ++--
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 730f9b8..12c1103 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
}
 }
 
+func (r *reader) HasNext() (bool, error) {
+   value := C.int(0)
+   res := C.pulsar_reader_has_message_available(r.ptr, )
+
+   if res != C.pulsar_result_Ok {
+   return false, newError(res, "Failed to check if next message is 
available")
+   } else if value == C.int(1) {
+   return true, nil
+   } else {
+   return false, nil
+   }
+}
+
 func (r *reader) Close() error {
channel := make(chan error)
r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index f61ebd7..7015c9c 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
// Read the next message in the topic, blocking until a message is 
available
Next(context.Context) (Message, error)
 
+   // Check if there is any message available to read from the current 
position
+   HasNext() (bool, error)
+
// Close the reader and stop the broker to push more messages
Close() error
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index 11d1b36..3b075e1 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-   "testing"
-   "fmt"
"context"
+   "fmt"
+   "testing"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
 
+   hasNext, err := reader.HasNext()
+   assertNil(t, err)
+   assertEqual(t, hasNext, true)
+
msg, err := reader.Next(ctx)
assertNil(t, err)
assertNotNil(t, msg)
 
assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
}
+
+   hasNext, err := reader.HasNext()
+   assertNil(t, err)
+   assertEqual(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {



[incubator-pulsar] branch branch-2.1 updated: Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new 2595941  Issue #2330: change getTopicName in MultiTopicsConsumer 
(#2346)
2595941 is described below

commit 259594165f5ee3c69a29629534af81011cab4ef1
Author: Jia Zhai 
AuthorDate: Tue Aug 28 02:38:41 2018 +0800

Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)

* change getTopicName in MultiTopicsConsumer

* change following sijie's comments

* keep both topicName and topicPartitonName in consumer to avoid new string
---
 .../apache/pulsar/client/impl/ConsumerImpl.java|  8 +++
 .../client/impl/MultiTopicsConsumerImpl.java   |  9 
 .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++
 .../pulsar/client/impl/TopicMessageImpl.java   | 20 
 .../client/impl/UnAckedTopicMessageTracker.java|  4 ++--
 .../pulsar/client/impl/MessageIdCompareToTest.java |  6 -
 .../tests/integration/semantics/SemanticsTest.java |  2 +-
 7 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1a7b67b..fe0e2c1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 private final SubscriptionInitialPosition subscriptionInitialPosition;
 private final ConnectionHandler connectionHandler;
 
+private final String topicNameWithoutPartition;
+
 enum SubscriptionMode {
 // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
 // position
@@ -203,6 +205,8 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 NonPersistentAcknowledgmentGroupingTracker.of();
 }
 
+topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
 grabCnx();
 }
 
@@ -1458,6 +1462,10 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 this.connectionHandler.grabCnx();
 }
 
+public String getTopicNameWithoutPartition() {
+return topicNameWithoutPartition;
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index bbfb3f3..6d5d56d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -230,7 +230,8 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 checkArgument(message instanceof MessageImpl);
 lock.writeLock().lock();
 try {
-TopicMessageImpl topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message);
+TopicMessageImpl topicMessage = new TopicMessageImpl<>(
+consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
 unAckedMessageTracker.add(topicMessage.getMessageId());
 
 if (log.isDebugEnabled()) {
@@ -369,7 +370,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 }
 
 if (ackType == AckType.Cumulative) {
-Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 if (individualConsumer != null) {
 MessageId innerId = topicMessageId.getInnerMessageId();
 return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -377,7 +378,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
 }
 } else {
-ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicName());
+ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 
 MessageId innerId = topicMessageId.getInnerMessageId();
 return consumer.doAcknowledge(innerId, ackType, properties)
@@ -510,7 +511,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 }
 removeExpiredMessagesFromQueue(messageIds);
 messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
-

[GitHub] sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer

2018-08-27 Thread GitBox
sijie commented on issue #2346: Issue #2330: change getTopicName in 
MultiTopicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/2346#issuecomment-416325631
 
 
   merged as 2595941 into branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch master updated: Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8688b67  Issue #2330: change getTopicName in MultiTopicsConsumer 
(#2346)
8688b67 is described below

commit 8688b6733177ee4a48d805df18a1d82865cf7320
Author: Jia Zhai 
AuthorDate: Tue Aug 28 02:38:41 2018 +0800

Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)

* change getTopicName in MultiTopicsConsumer

* change following sijie's comments

* keep both topicName and topicPartitonName in consumer to avoid new string
---
 .../apache/pulsar/client/impl/ConsumerImpl.java|  8 +++
 .../client/impl/MultiTopicsConsumerImpl.java   |  9 
 .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++
 .../pulsar/client/impl/TopicMessageImpl.java   | 20 
 .../client/impl/UnAckedTopicMessageTracker.java|  4 ++--
 .../pulsar/client/impl/MessageIdCompareToTest.java |  6 -
 .../tests/integration/semantics/SemanticsTest.java |  2 +-
 7 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index da04534..fe37b69 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 private final SubscriptionInitialPosition subscriptionInitialPosition;
 private final ConnectionHandler connectionHandler;
 
+private final String topicNameWithoutPartition;
+
 enum SubscriptionMode {
 // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
 // position
@@ -203,6 +205,8 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 NonPersistentAcknowledgmentGroupingTracker.of();
 }
 
+topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
 grabCnx();
 }
 
@@ -1458,6 +1462,10 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 this.connectionHandler.grabCnx();
 }
 
+public String getTopicNameWithoutPartition() {
+return topicNameWithoutPartition;
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 799bec8..f1cb9cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -231,7 +231,8 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 checkArgument(message instanceof MessageImpl);
 lock.writeLock().lock();
 try {
-TopicMessageImpl topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message);
+TopicMessageImpl topicMessage = new TopicMessageImpl<>(
+consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
 unAckedMessageTracker.add(topicMessage.getMessageId());
 
 if (log.isDebugEnabled()) {
@@ -370,7 +371,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 }
 
 if (ackType == AckType.Cumulative) {
-Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 if (individualConsumer != null) {
 MessageId innerId = topicMessageId.getInnerMessageId();
 return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -378,7 +379,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
 }
 } else {
-ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicName());
+ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 
 MessageId innerId = topicMessageId.getInnerMessageId();
 return consumer.doAcknowledge(innerId, ackType, properties)
@@ -511,7 +512,7 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 }
 removeExpiredMessagesFromQueue(messageIds);
 messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
-

[GitHub] sijie closed pull request #2346: Issue #2330: change getTopicName in MultiTopicsConsumer

2018-08-27 Thread GitBox
sijie closed pull request #2346: Issue #2330: change getTopicName in 
MultiTopicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/2346
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index da04534d94..fe37b69a50 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,8 @@
 private final SubscriptionInitialPosition subscriptionInitialPosition;
 private final ConnectionHandler connectionHandler;
 
+private final String topicNameWithoutPartition;
+
 enum SubscriptionMode {
 // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
 // position
@@ -203,6 +205,8 @@
 NonPersistentAcknowledgmentGroupingTracker.of();
 }
 
+topicNameWithoutPartition = topicName.getPartitionedTopicName();
+
 grabCnx();
 }
 
@@ -1458,6 +1462,10 @@ void grabCnx() {
 this.connectionHandler.grabCnx();
 }
 
+public String getTopicNameWithoutPartition() {
+return topicNameWithoutPartition;
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 799bec8e4d..f1cb9cf262 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -231,7 +231,8 @@ private void messageReceived(ConsumerImpl consumer, 
Message message) {
 checkArgument(message instanceof MessageImpl);
 lock.writeLock().lock();
 try {
-TopicMessageImpl topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(), message);
+TopicMessageImpl topicMessage = new TopicMessageImpl<>(
+consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
 unAckedMessageTracker.add(topicMessage.getMessageId());
 
 if (log.isDebugEnabled()) {
@@ -370,7 +371,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
 }
 
 if (ackType == AckType.Cumulative) {
-Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 if (individualConsumer != null) {
 MessageId innerId = topicMessageId.getInnerMessageId();
 return individualConsumer.acknowledgeCumulativeAsync(innerId);
@@ -378,7 +379,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
 }
 } else {
-ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicName());
+ConsumerImpl consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
 
 MessageId innerId = topicMessageId.getInnerMessageId();
 return consumer.doAcknowledge(innerId, ackType, properties)
@@ -511,7 +512,7 @@ public void redeliverUnacknowledgedMessages(Set 
messageIds) {
 }
 removeExpiredMessagesFromQueue(messageIds);
 messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
-.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, 
Collectors.toSet()))
+
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, 
Collectors.toSet()))
 .forEach((topicName, messageIds1) ->
 consumers.get(topicName)
 .redeliverUnacknowledgedMessages(messageIds1.stream()
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 071b804ffb..dd1b37dd33 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -18,20 +18,39 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+
 import java.util.Objects;
 import org.apache.pulsar.client.api.MessageId;

[GitHub] sijie commented on issue #2434: Add pulsar flink sink connector

2018-08-27 Thread GitBox
sijie commented on issue #2434: Add pulsar flink sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416325086
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2451: Issue 2312: add python 
client multi-topics consumer support
URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213071325
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar/__init__.py
 ##
 @@ -503,6 +503,211 @@ def my_listener(consumer, message):
 self._consumers.append(c)
 return c
 
+def subscribe_topics(self, topics, subscription_name,
 
 Review comment:
   @jiazhai can you open a new issue addressing this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: fix issue with protocol version not being correct (#2444)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new 96bf00f  fix issue with protocol version not being correct (#2444)
96bf00f is described below

commit 96bf00fa12bf74db57d36f40989e233752288351
Author: Boyang Jerry Peng 
AuthorDate: Fri Aug 24 22:40:58 2018 -0700

fix issue with protocol version not being correct (#2444)
---
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6a8ab74..b2d9478 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -232,8 +232,9 @@ public class ClientCnx extends PulsarHandler {
 if (log.isDebugEnabled()) {
 log.debug("{} Connection is ready", ctx.channel());
 }
-connectionFuture.complete(null);
+// set remote protocol version to the correct version before we 
complete the connection future
 remoteEndpointProtocolVersion = connected.getProtocolVersion();
+connectionFuture.complete(null);
 state = State.Ready;
 }
 



[GitHub] sijie commented on issue #2444: fix issue with protocol version not being correct

2018-08-27 Thread GitBox
sijie commented on issue #2444: fix issue with protocol version not being 
correct
URL: https://github.com/apache/incubator-pulsar/pull/2444#issuecomment-416323693
 
 
   merged as 
   96bf00f in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2439: leak assign schema in MessageImpl constructor

2018-08-27 Thread GitBox
sijie commented on issue #2439: leak assign schema in MessageImpl constructor
URL: https://github.com/apache/incubator-pulsar/pull/2439#issuecomment-416323334
 
 
   merged as acd77a0 in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: leak assign schema in MessageImpl constructor (#2439)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new acd77a0  leak assign schema in MessageImpl constructor (#2439)
acd77a0 is described below

commit acd77a0fe6f09de1919040b65a90fb145f7f1bfa
Author: Jia Zhai 
AuthorDate: Fri Aug 24 11:42:55 2018 +0800

leak assign schema in MessageImpl constructor (#2439)
---
 .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 5ea8bfe..96b68c1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -155,6 +155,7 @@ public class MessageImpl implements Message {
 this.cnx = null;
 this.payload = payload;
 this.properties = Collections.unmodifiableMap(properties);
+this.schema = schema;
 }
 
 public static MessageImpl deserialize(ByteBuf headersAndPayload) 
throws IOException {



[incubator-pulsar] branch branch-2.1 updated: Increased default brokerShutdownTimeout to 60 seconds (#2377)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new caf8d67  Increased default brokerShutdownTimeout to 60 seconds (#2377)
caf8d67 is described below

commit caf8d676f0a49896269b57032bd07ae3b4287564
Author: Matteo Merli 
AuthorDate: Wed Aug 15 11:50:18 2018 -0700

Increased default brokerShutdownTimeout to 60 seconds (#2377)

 ### Motivation

The default timeout for broker graceful shutdown is set to 3 seconds. This 
can give little room to do graceful shutdown when the broker is serving a lot 
of topics.

There is no big downside in increasing the timeout to a much bigger value.
---
 conf/broker.conf| 2 +-
 conf/standalone.conf| 2 +-
 deployment/terraform-ansible/templates/broker.conf  | 2 +-
 .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2e40d82..cd4a505 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -53,7 +53,7 @@ failureDomainsEnabled=false
 zooKeeperSessionTimeoutMillis=3
 
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
-brokerShutdownTimeoutMs=3000
+brokerShutdownTimeoutMs=6
 
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 0d7a395..cc03b95 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -46,7 +46,7 @@ failureDomainsEnabled=false
 zooKeeperSessionTimeoutMillis=3
 
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
-brokerShutdownTimeoutMs=3000
+brokerShutdownTimeoutMs=6
 
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index 73d4e2f..a7456a4 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -53,7 +53,7 @@ failureDomainsEnabled=false
 zooKeeperSessionTimeoutMillis=3
 
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
-brokerShutdownTimeoutMs=3000
+brokerShutdownTimeoutMs=6
 
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1bb6fff..ddc7728 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -78,7 +78,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 // Time to wait for broker graceful shutdown. After this time elapses, the
 // process will be killed
 @FieldContext(dynamic = true)
-private long brokerShutdownTimeoutMs = 3000;
+private long brokerShutdownTimeoutMs = 6;
 // Enable backlog quota check. Enforces action on topic when the quota is
 // reached
 private boolean backlogQuotaCheckEnabled = true;



[GitHub] sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty

2018-08-27 Thread GitBox
sijie commented on issue #2372: Kinesis-sink consider topic-name as 
partition-key if record key empty
URL: https://github.com/apache/incubator-pulsar/pull/2372#issuecomment-416319348
 
 
   merged as 703305b in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2420: [client] add properties to producer for cpp & python client

2018-08-27 Thread GitBox
sijie commented on issue #2420: [client] add properties to producer for cpp & 
python client
URL: https://github.com/apache/incubator-pulsar/pull/2420#issuecomment-416321563
 
 
   cherry-pick as d7ec1c5 in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: [client] add properties to producer for cpp & python client (#2420)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new d7ec1c5  [client] add properties to producer for cpp & python client 
(#2420)
d7ec1c5 is described below

commit d7ec1c5051ee3f7066cd1ba1572606760a3c371e
Author: Sijie Guo 
AuthorDate: Wed Aug 22 09:32:46 2018 -0700

[client] add properties to producer for cpp & python client (#2420)

* [client] add properties to producer for cpp & python client

 ### Motivation

This is a caught-up change to enable properties for producer as java 
clients.

 ### Changes

Enable properties on producer for both cpp & python client

 ### Results

Properties are added as metadata for CommandProducer. However there is no 
way
to verify the producer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.

* Add `properties` to pydoc
---
 .../include/pulsar/ProducerConfiguration.h | 34 ++
 pulsar-client-cpp/lib/Commands.cc  | 11 ++-
 pulsar-client-cpp/lib/Commands.h   |  3 +-
 pulsar-client-cpp/lib/ProducerConfiguration.cc | 34 +-
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |  1 +
 pulsar-client-cpp/lib/ProducerImpl.cc  |  3 +-
 pulsar-client-cpp/python/pulsar/__init__.py| 11 ++-
 pulsar-client-cpp/python/src/config.cc |  1 +
 pulsar-client-cpp/python/test_producer.py  |  6 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   |  2 ++
 10 files changed, 100 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9f7bf1f..45154c5 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -127,6 +127,40 @@ class ProducerConfiguration {
 bool isEncryptionEnabled() const;
 ProducerConfiguration& addEncryptionKey(std::string key);
 
+/**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+bool hasProperty(const std::string& name) const;
+
+/**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not 
defined
+ */
+const std::string& getProperty(const std::string& name) const;
+
+/**
+ * Get all the properties attached to this producer.
+ */
+std::map& getProperties() const;
+
+/**
+ * Sets a new property on a message.
+ * @param name   the name of the property
+ * @param value  the associated value
+ */
+ProducerConfiguration& setProperty(const std::string& name, const 
std::string& value);
+
+/**
+ * Add all the properties in the provided map
+ */
+ProducerConfiguration& setProperties(const std::map& properties);
+
 friend class PulsarWrapper;
 
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 8bd0128..4d9f1be 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
 #include "pulsar/MessageBuilder.h"
 #include "PulsarApi.pb.h"
 #include "LogUtils.h"
+#include "PulsarApi.pb.h"
 #include "Utils.h"
 #include "Url.h"
 #include "checksum/ChecksumProvider.h"
@@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t 
consumerId, uint64_t requestId) {
 }
 
 SharedBuffer Commands::newProducer(const std::string& topic, uint64_t 
producerId,
-   const std::string& producerName, uint64_t 
requestId) {
+   const std::string& producerName, uint64_t 
requestId,
+   const std::map& 
metadata) {
 BaseCommand cmd;
 cmd.set_type(BaseCommand::PRODUCER);
 CommandProducer* producer = cmd.mutable_producer();
 producer->set_topic(topic);
 producer->set_producer_id(producerId);
 producer->set_request_id(requestId);
+for (std::map::const_iterator it = 
metadata.begin(); it != metadata.end();
+ it++) {
+proto::KeyValue* keyValue = proto::KeyValue().New();
+keyValue->set_key(it->first);
+keyValue->set_value(it->second);
+producer->mutable_metadata()->AddAllocated(keyValue);
+}
 
 if (!producerName.empty()) {
 producer->set_producer_name(producerName);
diff --git a/pulsar-client-cpp/lib/Commands.h 

[incubator-pulsar] branch branch-2.1 updated: [client] add properties to consumer for cpp & python client (#2423)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new ffde1bc  [client] add properties to consumer for cpp & python client 
(#2423)
ffde1bc is described below

commit ffde1bcd45c73e94d5f41a05fc6fb2c6f31d3764
Author: Sijie Guo 
AuthorDate: Wed Aug 22 02:41:00 2018 -0700

[client] add properties to consumer for cpp & python client (#2423)

* [client] add properties to consumer for cpp & python client

 ### Motivation

This is a caught-up change to enable properties for consumer as java 
clients.

 ### Changes

Enable properties on consumer for both cpp & python client

 ### Results

Properties are added as metadata for CommandSubscribe. However there is no 
way
to verify the consumer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.

* remove "make format"
---
 .../include/pulsar/ConsumerConfiguration.h | 34 ++
 pulsar-client-cpp/lib/Commands.cc  | 12 +++-
 pulsar-client-cpp/lib/Commands.h   |  2 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc | 32 
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |  4 ++-
 pulsar-client-cpp/lib/ConsumerImpl.cc  |  2 +-
 pulsar-client-cpp/python/pulsar/__init__.py| 11 ++-
 pulsar-client-cpp/python/src/config.cc |  1 +
 pulsar-client-cpp/python/test_consumer.py  |  6 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   |  6 +++-
 10 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 36e5808..0687166 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -162,6 +162,40 @@ class ConsumerConfiguration {
 void setPatternAutoDiscoveryPeriod(int periodInSeconds);
 int getPatternAutoDiscoveryPeriod() const;
 
+/**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+bool hasProperty(const std::string& name) const;
+
+/**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not 
defined
+ */
+const std::string& getProperty(const std::string& name) const;
+
+/**
+ * Get all the properties attached to this producer.
+ */
+std::map& getProperties() const;
+
+/**
+ * Sets a new property on a message.
+ * @param name   the name of the property
+ * @param value  the associated value
+ */
+ConsumerConfiguration& setProperty(const std::string& name, const 
std::string& value);
+
+/**
+ * Add all the properties in the provided map
+ */
+ConsumerConfiguration& setProperties(const std::map& properties);
+
 friend class PulsarWrapper;
 
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 8a1933b..8bd0128 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -20,6 +20,7 @@
 #include "MessageImpl.h"
 #include "Version.h"
 #include "pulsar/MessageBuilder.h"
+#include "PulsarApi.pb.h"
 #include "LogUtils.h"
 #include "Utils.h"
 #include "Url.h"
@@ -27,6 +28,7 @@
 #include 
 #include 
 
+using namespace pulsar;
 namespace pulsar {
 
 using namespace pulsar::proto;
@@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& 
authentication, const
 SharedBuffer Commands::newSubscribe(const std::string& topic, const 
std::string& subscription,
 uint64_t consumerId, uint64_t requestId, 
CommandSubscribe_SubType subType,
 const std::string& consumerName, 
SubscriptionMode subscriptionMode,
-Optional startMessageId, bool 
readCompacted) {
+Optional startMessageId, bool 
readCompacted,
+const std::map& 
metadata) {
 BaseCommand cmd;
 cmd.set_type(BaseCommand::SUBSCRIBE);
 CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& 
topic, const std::string&
 messageIdData.set_batch_index(startMessageId.value().batchIndex());
 }
 }
+for (std::map::const_iterator it 

[GitHub] sijie commented on issue #2399: Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl

2018-08-27 Thread GitBox
sijie commented on issue #2399: Fixed initialization order of 
acknowledgmentsGroupingTracker in ConsumerImpl
URL: https://github.com/apache/incubator-pulsar/pull/2399#issuecomment-416320663
 
 
   merged as 93b6d20 in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2387: Fixed race condition during expansion of concurrent open hash maps

2018-08-27 Thread GitBox
sijie commented on issue #2387: Fixed race condition during expansion of 
concurrent open hash maps
URL: https://github.com/apache/incubator-pulsar/pull/2387#issuecomment-416320271
 
 
   cherry-pick as 6146ca9 to branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2377: Increased default brokerShutdownTimeout to 60 seconds

2018-08-27 Thread GitBox
sijie commented on issue #2377: Increased default brokerShutdownTimeout to 60 
seconds
URL: https://github.com/apache/incubator-pulsar/pull/2377#issuecomment-416319699
 
 
   merged as  7416fc0 in branch-2.1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] branch branch-2.1 updated: Fixed race condition during expansion of concurrent open hash maps (#2387)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new 6146ca9  Fixed race condition during expansion of concurrent open hash 
maps (#2387)
6146ca9 is described below

commit 6146ca9cc4c307bf4b19915ca6f13280efd7d428
Author: Matteo Merli 
AuthorDate: Fri Aug 17 08:47:30 2018 -0700

Fixed race condition during expansion of concurrent open hash maps (#2387)

### Motivation

Porting same fix as https://github.com/apache/bookkeeper/pull/1607 to 
correct issue reported on https://github.com/apache/bookkeeper/issues/1606.

There is a race condition in the concurrent open hash maps implementation. 
The race happens when the maps gets re-hashed after the expansion and the new 
arrays are substituting the old ones.

The race itself is that a thread doing a `get()` on the map is first 
checking the current capacity of the map, uses that to get the bucket and then 
tries to do optimistic read of the value in that bucket.

This assumes `capacity` update is visible only after the `values` array is 
already swapped, but that is not always the case in current code.

### Changes

 * Use `volatile` qualifier for `capacity` and `values` arrays to ensure 
ordering of memory read is respected by compiler
 * In rehashing, update `capacity` after `values` where it was not already 
the case
---
 .../util/collections/ConcurrentLongHashMap.java|  8 +--
 .../util/collections/ConcurrentLongPairSet.java| 15 +++---
 .../util/collections/ConcurrentOpenHashMap.java|  6 +--
 .../util/collections/ConcurrentOpenHashSet.java|  6 +--
 .../collections/ConcurrentLongHashMapTest.java | 57 --
 5 files changed, 72 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index fcb0c10..60c24c0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Map from long to an Object.
- * 
+ *
  * Provides similar methods as a ConcurrentMap with 2 differences:
  * 
  * No boxing/unboxing from long -> Long
@@ -187,10 +187,10 @@ public class ConcurrentLongHashMap {
 // A section is a portion of the hash map that is covered by a single
 @SuppressWarnings("serial")
 private static final class Section extends StampedLock {
-private long[] keys;
-private V[] values;
+private volatile long[] keys;
+private volatile V[] values;
 
-private int capacity;
+private volatile int capacity;
 private volatile int size;
 private int usedBuckets;
 private int resizeThreshold;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 74d4314..4634b40 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -26,7 +26,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.StampedLock;
-import java.util.function.Predicate;
 
 /**
  * Concurrent hash set where values are composed of pairs of longs.
@@ -163,11 +162,11 @@ public class ConcurrentLongPairSet {
 
 /**
  * Removes all of the elements of this collection that satisfy the given 
predicate.
- * 
+ *
  * @param filter
  *a predicate which returns {@code true} for elements to be 
removed
  * @return {@code true} if any elements were removed
- * 
+ *
  * @return number of removed values
  */
 public int removeIf(LongPairPredicate filter) {
@@ -209,9 +208,9 @@ public class ConcurrentLongPairSet {
 @SuppressWarnings("serial")
 private static final class Section extends StampedLock {
 // Keys and values are stored interleaved in the table array
-private long[] table;
+private volatile long[] table;
 
-private int capacity;
+private volatile int capacity;
 private volatile int size;
 private int usedBuckets;
 private int resizeThreshold;
@@ -449,9 +448,11 @@ public class ConcurrentLongPairSet {
 }
 }
 
-capacity = newCapacity;
 table = newTable;
 

[incubator-pulsar] branch branch-2.1 updated: Kinesis-sink consider topic-name as partition-key if record key empty (#2372)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new 703305b  Kinesis-sink consider topic-name as partition-key if record 
key empty (#2372)
703305b is described below

commit 703305b5426856bab7bab30a41e4f242e7782dc7
Author: Rajan Dhabalia 
AuthorDate: Tue Aug 14 15:41:45 2018 -0700

Kinesis-sink consider topic-name as partition-key if record key empty 
(#2372)
---
 .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 67de21a..cdfadde 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -114,7 +114,7 @@ public class KinesisSink implements Sink {
 record.getRecordSequence());
 throw new IllegalStateException("kinesis queue has publish 
failure");
 }
-String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
+String partitionedKey = 
record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
 partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256



[incubator-pulsar] branch branch-2.1 updated: [compaction] make topic compaction works with partitioned topic (#2367)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
 new f97a8f7  [compaction] make topic compaction works with partitioned 
topic (#2367)
f97a8f7 is described below

commit f97a8f7d136ddc609d01ce54e20f45e9949b04df
Author: Sijie Guo 
AuthorDate: Tue Aug 14 11:02:24 2018 -0700

[compaction] make topic compaction works with partitioned topic (#2367)

* [compaction] make topic compaction works with partitioned topic

 ### Motivation

Topic compaction doesn't work with partitioned topic.

 ### Changes

- make `RawReaderImpl` and `ReaderImpl` return message with partition idx
- make broker service `Consumer` deliver MessageIdData with partition idx
- add an integration test to ensure compaction work with partitioned topic
---
 .../java/org/apache/pulsar/broker/service/Consumer.java  |  9 +++--
 .../org/apache/pulsar/client/impl/RawReaderImpl.java | 16 
 .../org/apache/pulsar/compaction/TwoPhaseCompactor.java  |  8 
 .../PersistentDispatcherFailoverConsumerTest.java|  2 +-
 .../java/org/apache/pulsar/client/impl/ReaderImpl.java   |  4 +++-
 5 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c3befa5..4c54018 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -73,6 +73,7 @@ public class Consumer {
 private final String appId;
 private AuthenticationDataSource authenticationData;
 private final String topicName;
+private final int partitionIdx;
 private final InitialPosition subscriptionInitialPosition;
 
 private final long consumerId;
@@ -119,6 +120,7 @@ public class Consumer {
 this.subscription = subscription;
 this.subType = subType;
 this.topicName = topicName;
+this.partitionIdx = TopicName.getPartitionIndex(topicName);
 this.consumerId = consumerId;
 this.priorityLevel = priorityLevel;
 this.readCompacted = readCompacted;
@@ -239,8 +241,11 @@ public class Consumer {
 Entry entry = entries.get(i);
 PositionImpl pos = (PositionImpl) entry.getPosition();
 MessageIdData.Builder messageIdBuilder = 
MessageIdData.newBuilder();
-MessageIdData messageId = 
messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
-.build();
+MessageIdData messageId = messageIdBuilder
+.setLedgerId(pos.getLedgerId())
+.setEntryId(pos.getEntryId())
+.setPartition(partitionIdx)
+.build();
 
 ByteBuf metadataAndPayload = entry.getDataBuffer();
 // increment ref-count of data and release at the end of 
process: so, we can get chance to call entry.release
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index e768c3e..ae1a4db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader {
 
 RawConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData conf,
 CompletableFuture> consumerFuture) {
-super(client, conf.getSingleTopic(), conf, 
client.externalExecutorProvider().getExecutor(), -1,
-consumerFuture, SubscriptionMode.Durable, 
MessageId.earliest, Schema.BYTES);
+super(client,
+conf.getSingleTopic(),
+conf,
+client.externalExecutorProvider().getExecutor(),
+TopicName.getPartitionIndex(conf.getSingleTopic()),
+consumerFuture,
+SubscriptionMode.Durable,
+MessageId.earliest,
+Schema.BYTES);
 incomingRawMessages = new GrowableArrayBlockingQueue<>();
 

[GitHub] sijie commented on issue #2311: [cpp] receiver queue size config acorss partitions in multi-topics-consumer

2018-08-27 Thread GitBox
sijie commented on issue #2311: [cpp] receiver queue size config acorss 
partitions in multi-topics-consumer
URL: https://github.com/apache/incubator-pulsar/pull/2311#issuecomment-416318241
 
 
   merged as 
   bcb3d3f in master


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[incubator-pulsar] 02/02: [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit bcb3d3f6d15995b74c51d0507fece569361928da
Author: Jia Zhai 
AuthorDate: Tue Aug 14 06:05:59 2018 +0800

[cpp] receiver queue size config acorss partitions in multi-topics-consumer 
(#2311)

* catch up receiver queue size support in multi topics consumer

* add python config
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  5 -
 pulsar-client-cpp/python/pulsar/__init__.py| 13 
 pulsar-client-cpp/python/src/config.cc |  4 
 .../client/impl/MultiTopicsConsumerImpl.java   | 23 +-
 4 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c..6750273 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
 
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
 config.setMessageListener(
 boost::bind(::messageReceived, 
shared_from_this(), _1, _2));
-config.setReceiverQueueSize(conf_.getReceiverQueueSize());
 
 int numPartitions = partitionMetadata->getPartitions() >= 1 ? 
partitionMetadata->getPartitions() : 1;
+// Apply total limit of receiver queue size across partitions
+config.setReceiverQueueSize(
+std::min(conf_.getReceiverQueueSize(),
+ (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions)));
 
 Lock lock(mutex_);
 topicsPartitions_.insert(std::make_pair(topicName->toString(), 
numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07..f3b560b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ class Client:
 send_timeout_millis=3,
 compression_type=CompressionType.NONE,
 max_pending_messages=1000,
+max_pending_messages_across_partitions=5,
 block_if_queue_full=False,
 batching_enabled=False,
 batching_max_messages=1000,
@@ -352,6 +353,9 @@ class Client:
 * `max_pending_messages`:
   Set the max size of the queue holding the messages pending to receive
   an acknowledgment from the broker.
+* `max_pending_messages_across_partitions`:
+  Set the max size of the queue holding the messages pending to receive
+  an acknowledgment across partitions from the broker.
 * `block_if_queue_full`: Set whether `send_async` operations should
   block when the outgoing message queue is full.
 * `message_routing_mode`:
@@ -364,6 +368,7 @@ class Client:
 _check_type(int, send_timeout_millis, 'send_timeout_millis')
 _check_type(CompressionType, compression_type, 'compression_type')
 _check_type(int, max_pending_messages, 'max_pending_messages')
+_check_type(int, max_pending_messages_across_partitions, 
'max_pending_messages_across_partitions')
 _check_type(bool, block_if_queue_full, 'block_if_queue_full')
 _check_type(bool, batching_enabled, 'batching_enabled')
 _check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ class Client:
 conf.send_timeout_millis(send_timeout_millis)
 conf.compression_type(compression_type)
 conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
 conf.block_if_queue_full(block_if_queue_full)
 conf.batching_enabled(batching_enabled)
 conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ class Client:
   consumer_type=ConsumerType.Exclusive,
   message_listener=None,
   receiver_queue_size=1000,
+  max_total_receiver_queue_size_across_partitions=5,
   consumer_name=None,
   unacked_messages_timeout_ms=None,
   broker_consumer_stats_cache_time_ms=3,
@@ -434,6 +441,9 @@ class Client:
   should not be interrupted when the consumer queue size is zero. The
   default value is 1000 messages and should work well for most use
   cases.
+* `max_total_receiver_queue_size_across_partitions`
+  Set the max total receiver queue size across partitions.
+  This setting will be used to reduce the receiver queue size for 
individual partitions
 * 

[incubator-pulsar] branch branch-2.1 updated (b070cfd -> bcb3d3f)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a change to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


from b070cfd  [docker] Publish pulsar-all image to docker hub (#2361)
 new 068db59  Fix: function with multi-topic not acking on effectively-once 
(#2347)
 new bcb3d3f  [cpp] receiver queue size config acorss partitions in 
multi-topics-consumer (#2311)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/PartitionedProducerConsumerTest.java   |  9 ++---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 10 +++---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  5 ++-
 pulsar-client-cpp/python/pulsar/__init__.py| 13 +++
 pulsar-client-cpp/python/src/config.cc |  4 +++
 .../client/impl/MultiTopicsConsumerImpl.java   | 40 ++
 .../pulsar/functions/source/PulsarSource.java  | 24 +++--
 7 files changed, 67 insertions(+), 38 deletions(-)



[incubator-pulsar] 01/02: Fix: function with multi-topic not acking on effectively-once (#2347)

2018-08-27 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 068db598adb529d1ae441084c280f0e776d2cbbc
Author: Rajan Dhabalia 
AuthorDate: Mon Aug 13 14:19:46 2018 -0700

Fix: function with multi-topic not acking on effectively-once (#2347)

 ### Motivation

`MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and 
therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not 
acking message and failing `EFFECTIVELY_ONCE` behavior.

 ### Modifications

Function should ack message for a specific topic consumer if 
`inputTopicConsumer` is multi-topic consumer.

 ### Result

Function should able to ack messages for multi-topic consumer when 
processing-guarantee is `EFFECTIVELY_ONCE`
---
 .../api/PartitionedProducerConsumerTest.java   |  9 ++--
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 10 +
 .../client/impl/MultiTopicsConsumerImpl.java   | 17 +--
 .../pulsar/functions/source/PulsarSource.java  | 24 --
 4 files changed, 32 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532..ae7757d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 }
 
 try {
-producer = pulsarClient.newProducer().topic(topicName.toString())
-.enableBatching(false)
-.messageRoutingMode(MessageRoutingMode.SinglePartition)
-.create();
+producer = 
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
 producer.send("message1".getBytes());
 producer.send("message2".getBytes());
 /* Message msg1 = */ consumer.receive();
 Message msg2 = consumer.receive();
 consumer.acknowledgeCumulative(msg2);
-Assert.fail("should fail since ack cumulative is not supported for 
partitioned topic");
-} catch (PulsarClientException e) {
-Assert.assertTrue(e instanceof 
PulsarClientException.NotSupportedException);
 } finally {
 producer.close();
 consumer.unsubscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5398bc9..2001981 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public class PulsarSinkE2ETest {
 retryStrategically((test) -> {
 try {
 SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
-return subStats.unackedMessages == 0;
+return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
 } catch (PulsarAdminException e) {
 return false;
 }
-}, 5, 500);
+}, 5, 200);
 
 FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
 functionRuntimeManager.updateRates();
@@ -400,11 +401,12 @@ public class PulsarSinkE2ETest {
 functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
 functionDetailsBuilder.setParallelism(1);
 functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
 
 // set source spec
 // source spec classname should be empty so that the default pulsar 
source will be used
 

[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink

2018-08-27 Thread GitBox
david-streamlio commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r213064055
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/**
+ * A file or comma separated list of files which contains the Hadoop file 
system configuration.
+ * Without this, Hadoop will search the classpath for a 'core-site.xml' 
and 'hdfs-site.xml' file
+ * or will revert to a default configuration.
+ */
+private String hdfsConfigResources;
+
+/**
+ * The HDFS directory from which files should be read from or written to.
+ */
+private String directory;
+
+/**
+ * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+ */
+private String encoding;
+
+/**
+ * The compression codec used to compress/de-compress the files on HDFS.
+ */
+private Compression compression;
+
+/**
+ * The Kerberos user principal account to use for authentication.
+ */
+private String kerberosUserPrincipal;
+
+/**
+ * The full pathname to the Kerberos keytab file to use for authentication.
+ */
+private String keytab;
+
+public void validate() {
+if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory)) {
 
 Review comment:
   I will change the comment, since we cannot assume that this Sink will be 
executing on a node that has the Hadoop client configuration files available.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srkukarni commented on a change in pull request #2409: Added HDFS Sink

2018-08-27 Thread GitBox
srkukarni commented on a change in pull request #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r213060424
 
 

 ##
 File path: 
pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
 ##
 @@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/**
+ * A file or comma separated list of files which contains the Hadoop file 
system configuration.
+ * Without this, Hadoop will search the classpath for a 'core-site.xml' 
and 'hdfs-site.xml' file
+ * or will revert to a default configuration.
+ */
+private String hdfsConfigResources;
+
+/**
+ * The HDFS directory from which files should be read from or written to.
+ */
+private String directory;
+
+/**
+ * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+ */
+private String encoding;
+
+/**
+ * The compression codec used to compress/de-compress the files on HDFS.
+ */
+private Compression compression;
+
+/**
+ * The Kerberos user principal account to use for authentication.
+ */
+private String kerberosUserPrincipal;
+
+/**
+ * The full pathname to the Kerberos keytab file to use for authentication.
+ */
+private String keytab;
+
+public void validate() {
+if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory)) {
 
 Review comment:
   The comments for hdfsConfigResources indicate that if this is unset we look 
into certain places. Thus it looks like this check shouldnt be there?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2450: Added Reader.HasNext in Go client

2018-08-27 Thread GitBox
merlimat commented on issue #2450: Added Reader.HasNext in Go client
URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416311755
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2449: Support compaction options in Go client

2018-08-27 Thread GitBox
merlimat commented on issue #2449: Support compaction options in Go client
URL: https://github.com/apache/incubator-pulsar/pull/2449#issuecomment-416311523
 
 
   run integration tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat opened a new pull request #2453: Fixed cpp multi-topic consumer when topics are not partitioned

2018-08-27 Thread GitBox
merlimat opened a new pull request #2453: Fixed cpp multi-topic consumer when 
topics are not partitioned
URL: https://github.com/apache/incubator-pulsar/pull/2453
 
 
   ### Motivation
   
   The multi-topics consumer in C++ client is not correctly handling the case 
where topics are not partitioned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2452: [Schema] Provide a generic 
record interface for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053077
 
 

 ##
 File path: 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 ##
 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+/**
+ * Returns the list of fields associated with the record.
+ *
+ * @return the list of fields associated with the record.
+ */
+List getFields();
 
 Review comment:
   I didn't do getAsString, getAsLong at this PR, because it is better to have 
primitive schemas (e.g. int, long, double, float, string, boolean) first. this 
generic interfaces allows unblocking JDBC connector first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2452: [Schema] Provide a generic 
record interface for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053108
 
 

 ##
 File path: 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
 ##
 @@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+private final String name;
 
 Review comment:
   will do


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message

2018-08-27 Thread GitBox
sijie commented on a change in pull request #2452: [Schema] Provide a generic 
record interface for representing a typed message
URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053077
 
 

 ##
 File path: 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 ##
 @@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+/**
+ * Returns the list of fields associated with the record.
+ *
+ * @return the list of fields associated with the record.
+ */
+List getFields();
 
 Review comment:
   I didn't do getAsString, getAsLong at this PR, because it is better to have 
primitive schemas (e.g. int, long, double, float, string, boolean). this 
generic interfaces allows unblocking JDBC connector first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sijie commented on issue #2409: Added HDFS Sink

2018-08-27 Thread GitBox
sijie commented on issue #2409: Added HDFS Sink
URL: https://github.com/apache/incubator-pulsar/pull/2409#issuecomment-416303592
 
 
   @srkukarni @jerrypeng can you guys also review this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2447: Added producer/consumer properties in Go client

2018-08-27 Thread GitBox
merlimat commented on issue #2447: Added producer/consumer properties in Go 
client
URL: https://github.com/apache/incubator-pulsar/pull/2447#issuecomment-416292701
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support

2018-08-27 Thread GitBox
merlimat commented on a change in pull request #2451: Issue 2312: add python 
client multi-topics consumer support
URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213042192
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar_test.py
 ##
 @@ -650,6 +650,110 @@ def _v2_topics(self, url):
 
 client.close()
 
+def test_topics_consumer(self):
+client = Client(self.serviceUrl)
+topic1 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-1'
+topic2 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-2'
+topic3 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-3'
+topics = [topic1, topic2, topic3]
+
+url1 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions'
+url2 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions'
+url3 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions'
+
+doHttpPut(url1, '2')
+doHttpPut(url2, '3')
+doHttpPut(url3, '4')
 
 Review comment:
   As I mentioned in #2448, I think the multi-topic consumer in C++ is 
currently not working when there are no partitions in the topic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support

2018-08-27 Thread GitBox
merlimat commented on a change in pull request #2451: Issue 2312: add python 
client multi-topics consumer support
URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213041782
 
 

 ##
 File path: pulsar-client-cpp/python/pulsar/__init__.py
 ##
 @@ -503,6 +503,211 @@ def my_listener(consumer, message):
 self._consumers.append(c)
 return c
 
+def subscribe_topics(self, topics, subscription_name,
 
 Review comment:
   @jiazhai I'd prefer to keep a single `subscribe()` method. In python we can 
check the type of the argument `topic` and detect whether is a string, a list 
or a regex object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2450: Added Reader.HasNext in Go client

2018-08-27 Thread GitBox
merlimat commented on issue #2450: Added Reader.HasNext in Go client
URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416289232
 
 
   run cpp tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >