[GitHub] aahmed-se commented on issue #2456: Remove Aerospike Connector
aahmed-se commented on issue #2456: Remove Aerospike Connector URL: https://github.com/apache/incubator-pulsar/pull/2456#issuecomment-416458802 Rebased on master This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2434: Add pulsar flink sink connector
aahmed-se commented on issue #2434: Add pulsar flink sink connector URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416458754 should be fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui commented on a change in pull request #2460: Add backlog quota retention policy to server config.
codelipenghui commented on a change in pull request #2460: Add backlog quota retention policy to server config. URL: https://github.com/apache/incubator-pulsar/pull/2460#discussion_r213189938 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java ## @@ -86,6 +86,11 @@ private int backlogQuotaCheckIntervalInSeconds = 60; // Default per-topic backlog quota limit private long backlogQuotaDefaultLimitGB = 50; +//Default backlog quota retention policy. Default is producer_request_hold +//'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out) +//'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer +//'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog +private String backlogQuotaRetentionPolicy = "producer_request_hold"; Review comment: Yeah, it's already have a enum in BacklogQuota named RetentionPolicy ```java public static enum RetentionPolicy { /** Policy which holds producer's send request until the resource becomes available (or holding times out) */ producer_request_hold, /** Policy which throws javax.jms.ResourceAllocationException to the producer */ producer_exception, /** Policy which evicts the oldest message from the slowest consumer's backlog */ consumer_backlog_eviction, } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2456: Remove Aerospike Connector
sijie commented on issue #2456: Remove Aerospike Connector URL: https://github.com/apache/incubator-pulsar/pull/2456#issuecomment-416456609 @aahmed-se you need to rebase your branch This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2380: Increase default maxConcurrentLookupRequests to 50k
sijie commented on issue #2380: Increase default maxConcurrentLookupRequests to 50k URL: https://github.com/apache/incubator-pulsar/pull/2380#issuecomment-416456387 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2376: adding tiered storage confs to endpoint
sijie closed pull request #2376: adding tiered storage confs to endpoint URL: https://github.com/apache/incubator-pulsar/pull/2376 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index f6319aeaa7..f5240c989d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -62,7 +62,7 @@ import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.offload.TieredStorageConfigurationData; +import org.apache.pulsar.common.offload.TieredStorageConfigurationData; import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; @@ -660,14 +660,13 @@ public LedgerOffloader getManagedLedgerOffloader() { static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion"; static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha"; - public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) throws PulsarServerException { if (conf.getManagedLedgerOffloadDriver() != null && BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) { try { return BlobStoreManagedLedgerOffloader.create( -getTieredStorageConf(conf), +getTieredStorageConf(), ImmutableMap.of( METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(), METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha() @@ -681,23 +680,23 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur } } -private static TieredStorageConfigurationData getTieredStorageConf(ServiceConfiguration serverConf) { +public TieredStorageConfigurationData getTieredStorageConf() { TieredStorageConfigurationData tsConf = new TieredStorageConfigurationData(); // generic settings - tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver()); - tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads()); + tsConf.setManagedLedgerOffloadDriver(config.getManagedLedgerOffloadDriver()); + tsConf.setManagedLedgerOffloadMaxThreads(config.getManagedLedgerOffloadMaxThreads()); // s3 settings - tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion()); - tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket()); - tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint()); - tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()); - tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes()); + tsConf.setS3ManagedLedgerOffloadRegion(config.getS3ManagedLedgerOffloadRegion()); + tsConf.setS3ManagedLedgerOffloadBucket(config.getS3ManagedLedgerOffloadBucket()); + tsConf.setS3ManagedLedgerOffloadServiceEndpoint(config.getS3ManagedLedgerOffloadServiceEndpoint()); + tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(config.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()); + tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(config.getS3ManagedLedgerOffloadReadBufferSizeInBytes()); // gcs settings - tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion()); - tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket()); - tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile()); - tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes()); - tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes()); + tsConf.setGcsManagedLedgerOffloadRegion(config.getGcsManagedLedgerOffloadRegion()); +
[GitHub] sijie commented on issue #2376: adding tiered storage confs to endpoint
sijie commented on issue #2376: adding tiered storage confs to endpoint URL: https://github.com/apache/incubator-pulsar/pull/2376#issuecomment-416456296 This is not needed any more. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix flake in DiscoveryServiceTest (#1081) (#2406)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 774cafa Fix flake in DiscoveryServiceTest (#1081) (#2406) 774cafa is described below commit 774cafa25d6340c778c5c281045446086aff96a0 Author: Ivan Kelly AuthorDate: Tue Aug 28 07:30:42 2018 +0200 Fix flake in DiscoveryServiceTest (#1081) (#2406) This test was flaking because it was only waiting for 1 second for connection and message exchange to complete, which is not enough time when there's heavy load on the machine (simulated with stress-ng). The fix is to increase the timeout to 10 seconds. I've also cleaned up the test to use a CompletableFuture rather than a CountDownLatch so tha the test thread can be notified of failures in the handlers. --- .../discovery/service/DiscoveryServiceTest.java| 77 -- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java index 7d4a5ee..24bcd6a 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java @@ -21,7 +21,6 @@ package org.apache.pulsar.discovery.service; import static org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -31,16 +30,17 @@ import java.net.URISyntaxException; import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; @@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { +private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceTest.class); + private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; @@ -120,48 +125,40 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { /** * It verifies: client connects to Discovery-service and receives discovery response successfully. - * + * * @throws Exception */ @Test public void testClientServerConnection() throws Exception { addBrokerToZk(2); -// 1. client connects to DiscoveryService, 2. Client receive service-lookup response -final int messageTransfer = 2; -final CountDownLatch latch = new CountDownLatch(messageTransfer); -NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false); -try { -assertTrue(latch.await(1, TimeUnit.SECONDS)); -} catch (InterruptedException e) { -fail("should have received lookup response message from server", e); -} + +final CompletableFuture promise = new CompletableFuture<>(); +NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), promise, false); +assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); workerGroup.shutdownGracefully(); } @Test public void testClientServerConnectionTls() throws Exception { addBrokerToZk(2); -// 1. client connects to DiscoveryService, 2. Client receive service-lookup response -final int messageTransfer = 2; -final
[GitHub] sijie commented on issue #2434: Add pulsar flink sink connector
sijie commented on issue #2434: Add pulsar flink sink connector URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416456095 @aahmed-se there are some license header issues. PTAL ``` 2018-08-27\T\19:50:00.378 [ERROR] Failed to execute goal com.mycila:license-maven-plugin:3.0.rc1:check (default-cli) on project pulsar-flink: Some files do not have the expected license header -> [Help 1] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2406: Fix flake in DiscoveryServiceTest (#1081)
sijie closed pull request #2406: Fix flake in DiscoveryServiceTest (#1081) URL: https://github.com/apache/incubator-pulsar/pull/2406 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java index 7d4a5eeefc..24bcd6a302 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java @@ -21,7 +21,6 @@ import static org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -31,16 +30,17 @@ import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; @@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { +private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceTest.class); + private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; @@ -120,48 +125,40 @@ public void testGetPartitionsMetadata() throws Exception { /** * It verifies: client connects to Discovery-service and receives discovery response successfully. - * + * * @throws Exception */ @Test public void testClientServerConnection() throws Exception { addBrokerToZk(2); -// 1. client connects to DiscoveryService, 2. Client receive service-lookup response -final int messageTransfer = 2; -final CountDownLatch latch = new CountDownLatch(messageTransfer); -NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false); -try { -assertTrue(latch.await(1, TimeUnit.SECONDS)); -} catch (InterruptedException e) { -fail("should have received lookup response message from server", e); -} + +final CompletableFuture promise = new CompletableFuture<>(); +NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), promise, false); +assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); workerGroup.shutdownGracefully(); } @Test public void testClientServerConnectionTls() throws Exception { addBrokerToZk(2); -// 1. client connects to DiscoveryService, 2. Client receive service-lookup response -final int messageTransfer = 2; -final CountDownLatch latch = new CountDownLatch(messageTransfer); -NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), latch, true); -try { -assertTrue(latch.await(1, TimeUnit.SECONDS)); -} catch (InterruptedException e) { -fail("should have received lookup response message from server", e); -} + +final CompletableFuture promise = new CompletableFuture<>(); +NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), promise, true); +assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); workerGroup.shutdownGracefully(); } /** * creates ClientHandler channel to connect and communicate with server - * + * *
[GitHub] sijie commented on issue #2454: Fix compilation issue on CLITest
sijie commented on issue #2454: Fix compilation issue on CLITest URL: https://github.com/apache/incubator-pulsar/pull/2454#issuecomment-416455575 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages
sijie closed pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages URL: https://github.com/apache/incubator-pulsar/pull/2455 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site2/docs/client-libraries-go.md b/site2/docs/client-libraries-go.md index 2433eb941f..421d45797b 100644 --- a/site2/docs/client-libraries-go.md +++ b/site2/docs/client-libraries-go.md @@ -34,7 +34,7 @@ $ go get -u github.com/apache/incubator-pulsar/pulsar-client-go/pulsar Or you can use [dep](https://github.com/golang/dep) for managing the dependencies. ```bash -$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}} +$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}} ``` Once installed locally, you can import it into your project: diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md index 78ea557b6f..b5793c6e4a 100644 --- a/site2/docs/deploy-bare-metal.md +++ b/site2/docs/deploy-bare-metal.md @@ -125,12 +125,12 @@ $ tar xvfz apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/docs/getting-started-standalone.md b/site2/docs/getting-started-standalone.md index 40904512d0..0dcf024a40 100644 --- a/site2/docs/getting-started-standalone.md +++ b/site2/docs/getting-started-standalone.md @@ -87,12 +87,12 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md index 042ce8e48b..914429b30b 100644 --- a/site2/docs/io-quickstart.md +++ b/site2/docs/io-quickstart.md @@ -69,12 +69,12 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md b/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md index 04fdae5302..206f3e2a28 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md @@ -35,7 +35,7 @@ $ go get -u github.com/apache/incubator-pulsar/pulsar-client-go/pulsar Or you can use [dep](https://github.com/golang/dep) for managing the dependencies. ```bash -$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}} +$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}} ``` Once installed locally, you can import it into your project: diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md b/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md index 23b2121be8..feeb005cdc 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/deploy-bare-metal.md +++
[incubator-pulsar] branch master updated: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages (#2455)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8da0669 Fixed {{pulsar.version}} into {{pulsar:version}} in few pages (#2455) 8da0669 is described below commit 8da066947b339d1f71025f8d3857bb7e5b9e70a2 Author: Matteo Merli AuthorDate: Mon Aug 27 22:26:10 2018 -0700 Fixed {{pulsar.version}} into {{pulsar:version}} in few pages (#2455) ### Motivation In few places in website we're displaying the literal `{{pulsar.version}}` instead of the actual Pulsar version string. The correct variable name is expected to use `:` as separator. --- site2/docs/client-libraries-go.md| 2 +- site2/docs/deploy-bare-metal.md | 12 ++-- site2/docs/getting-started-standalone.md | 12 ++-- site2/docs/io-quickstart.md | 12 ++-- .../version-2.1.0-incubating/client-libraries-go.md | 2 +- .../version-2.1.0-incubating/deploy-bare-metal.md| 12 ++-- .../version-2.1.0-incubating/getting-started-standalone.md | 12 ++-- .../versioned_docs/version-2.1.0-incubating/io-quickstart.md | 12 ++-- 8 files changed, 38 insertions(+), 38 deletions(-) diff --git a/site2/docs/client-libraries-go.md b/site2/docs/client-libraries-go.md index 2433eb9..421d457 100644 --- a/site2/docs/client-libraries-go.md +++ b/site2/docs/client-libraries-go.md @@ -34,7 +34,7 @@ $ go get -u github.com/apache/incubator-pulsar/pulsar-client-go/pulsar Or you can use [dep](https://github.com/golang/dep) for managing the dependencies. ```bash -$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar.version}} +$ dep ensure -add github.com/apache/incubator-pulsar/pulsar-client-go/pulsar@v{{pulsar:version}} ``` Once installed locally, you can import it into your project: diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md index 78ea557..b5793c6 100644 --- a/site2/docs/deploy-bare-metal.md +++ b/site2/docs/deploy-bare-metal.md @@ -125,12 +125,12 @@ $ tar xvfz apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/docs/getting-started-standalone.md b/site2/docs/getting-started-standalone.md index 4090451..0dcf024 100644 --- a/site2/docs/getting-started-standalone.md +++ b/site2/docs/getting-started-standalone.md @@ -87,12 +87,12 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md index 042ce8e..914429b 100644 --- a/site2/docs/io-quickstart.md +++ b/site2/docs/io-quickstart.md @@ -69,12 +69,12 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar.version}}.nar -pulsar-io-cassandra-{{pulsar.version}}.nar -pulsar-io-kafka-{{pulsar.version}}.nar -pulsar-io-kinesis-{{pulsar.version}}.nar -pulsar-io-rabbitmq-{{pulsar.version}}.nar -pulsar-io-twitter-{{pulsar.version}}.nar +pulsar-io-aerospike-{{pulsar:version}}.nar +pulsar-io-cassandra-{{pulsar:version}}.nar +pulsar-io-kafka-{{pulsar:version}}.nar +pulsar-io-kinesis-{{pulsar:version}}.nar +pulsar-io-rabbitmq-{{pulsar:version}}.nar +pulsar-io-twitter-{{pulsar:version}}.nar ... ``` diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/client-libraries-go.md
[GitHub] sijie commented on a change in pull request #2459: Added Contributing page on website
sijie commented on a change in pull request #2459: Added Contributing page on website URL: https://github.com/apache/incubator-pulsar/pull/2459#discussion_r213187047 ## File path: site2/website/contributing.md ## @@ -0,0 +1,250 @@ + +The Apache Pulsar community welcomes contributions from anyone with a passion for distributed systems! Pulsar has many different opportunities for contributions -- +write new examples/tutorials, add new user-facing libraries, write new Pulsar IO connectors, or participate on the documentation effort. + +We use a review-then-commit workflow in Pulsar for all contributions. + +**For larger contributions or those that affect multiple components:** + +1. **Engage**: We encourage you to work with the Pulsar community on the + [Github Issues](https://github.com/apache/incubator-pulsar/issues) and + [developer’s mailing list](/contact) to identify + good areas for contribution. +1. **Design:** More complicated contributions will likely benefit from some early discussion in + order to scope and design them well. + +**For all contributions:** + +1. **Code:** The best part ;-) +1. **Review:** Submit a pull request with your contribution to our + [GitHub Repo](https://github.com/apache/incubator-pulsar). Work with a committer to review and + iterate on the code, if needed. +1. **Commit:** Once at least 2 Pulsar committers have approved the pull request, a Pulsar committer +will merge it into the master branch (and potentially backport to stable branches in case of +bug fixes). + +We look forward to working with you! + +## Engage + +### Mailing list(s) + +We discuss design and implementation issues on the [d...@pulsar.incubator.apache.org](mailto:d...@pulsar.incubator.apache.org) +mailing list, which is archived [here](https://lists.apache.org/list.html?d...@pulsar.apache.org). +Join by emailing [`dev-subscr...@pulsar.incubator.apache.org`](mailto:dev-subscr...@pulsar.incubator.apache.org). + +If interested, you can also join the other [mailing lists](/contact). + +### Github Issues + +We are using [Github Issues](https://github.com/apache/incubator-pulsar/issues) as the issue tracking +and project management tool, as well as a way to communicate among a very diverse and distributed set +of contributors. To be able to gather feedback, avoid frustration, and avoid duplicated efforts all +Pulsar related work are being tracked there. + +If you do not already have an Github account, sign up [here](https://github.com/join). + +If a quick [search](https://github.com/apache/incubator-pulsar/issues) doesn’t turn up an existing +Github issue for the work you want to contribute, create it. Please discuss your idea with a +committer in Github or, alternatively, on the developer mailing list. + +If there’s an existing Github issue for your intended contribution, please comment about your intended +work. Once the work is understood, a committer will assign the issue to you. If an issue is currently +assigned, please check with the current assignee before reassigning. + +For moderate or large contributions, you should not start coding or writing a design document unless +there is a corresponding Github issue assigned to you for that work. Simple changes, like fixing typos, +do not require an associated issue. + +### Online discussions + +We are using [Apache Pulsar Slack channel](https://apache-pulsar.slack.com/) for online discussions. +You can self-invite yourself by accessing [this link](https://apache-pulsar.herokuapp.com/). + +Slack channels are great for quick questions or discussions on specialized topics. Remember that we +strongly encourage communication via the mailing lists, and we prefer to discuss more complex subjects +by email. Developers should be careful to move or duplicate all the official or useful discussions to +the issue tracking system and/or the dev mailing list. + +## Design + +To avoid potential frustration during the code review cycle, we encourage you to clearly scope and +design non-trivial contributions with the BookKeeper community before you start coding. Review comment: Pulsar community This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2459: Added Contributing page on website
sijie commented on a change in pull request #2459: Added Contributing page on website URL: https://github.com/apache/incubator-pulsar/pull/2459#discussion_r213187066 ## File path: site2/website/contributing.md ## @@ -0,0 +1,250 @@ + +The Apache Pulsar community welcomes contributions from anyone with a passion for distributed systems! Pulsar has many different opportunities for contributions -- +write new examples/tutorials, add new user-facing libraries, write new Pulsar IO connectors, or participate on the documentation effort. + +We use a review-then-commit workflow in Pulsar for all contributions. + +**For larger contributions or those that affect multiple components:** + +1. **Engage**: We encourage you to work with the Pulsar community on the + [Github Issues](https://github.com/apache/incubator-pulsar/issues) and + [developer’s mailing list](/contact) to identify + good areas for contribution. +1. **Design:** More complicated contributions will likely benefit from some early discussion in + order to scope and design them well. + +**For all contributions:** + +1. **Code:** The best part ;-) +1. **Review:** Submit a pull request with your contribution to our + [GitHub Repo](https://github.com/apache/incubator-pulsar). Work with a committer to review and + iterate on the code, if needed. +1. **Commit:** Once at least 2 Pulsar committers have approved the pull request, a Pulsar committer +will merge it into the master branch (and potentially backport to stable branches in case of +bug fixes). + +We look forward to working with you! + +## Engage + +### Mailing list(s) + +We discuss design and implementation issues on the [d...@pulsar.incubator.apache.org](mailto:d...@pulsar.incubator.apache.org) +mailing list, which is archived [here](https://lists.apache.org/list.html?d...@pulsar.apache.org). +Join by emailing [`dev-subscr...@pulsar.incubator.apache.org`](mailto:dev-subscr...@pulsar.incubator.apache.org). + +If interested, you can also join the other [mailing lists](/contact). + +### Github Issues + +We are using [Github Issues](https://github.com/apache/incubator-pulsar/issues) as the issue tracking +and project management tool, as well as a way to communicate among a very diverse and distributed set +of contributors. To be able to gather feedback, avoid frustration, and avoid duplicated efforts all +Pulsar related work are being tracked there. + +If you do not already have an Github account, sign up [here](https://github.com/join). + +If a quick [search](https://github.com/apache/incubator-pulsar/issues) doesn’t turn up an existing +Github issue for the work you want to contribute, create it. Please discuss your idea with a +committer in Github or, alternatively, on the developer mailing list. + +If there’s an existing Github issue for your intended contribution, please comment about your intended +work. Once the work is understood, a committer will assign the issue to you. If an issue is currently +assigned, please check with the current assignee before reassigning. + +For moderate or large contributions, you should not start coding or writing a design document unless +there is a corresponding Github issue assigned to you for that work. Simple changes, like fixing typos, +do not require an associated issue. + +### Online discussions + +We are using [Apache Pulsar Slack channel](https://apache-pulsar.slack.com/) for online discussions. +You can self-invite yourself by accessing [this link](https://apache-pulsar.herokuapp.com/). + +Slack channels are great for quick questions or discussions on specialized topics. Remember that we +strongly encourage communication via the mailing lists, and we prefer to discuss more complex subjects +by email. Developers should be careful to move or duplicate all the official or useful discussions to +the issue tracking system and/or the dev mailing list. + +## Design + +To avoid potential frustration during the code review cycle, we encourage you to clearly scope and +design non-trivial contributions with the BookKeeper community before you start coding. + +We are using "Pulsar Improvement Proposals" (or "PIP") for managing major changes to Pulsar. The +list of all PIPs is maintained in the Pulsar wiki at https://github.com/apache/incubator-pulsar/wiki. + +## Code + +To contribute code to Apache Pulsar, you’ll have to do a few administrative steps once, and then +follow the [Coding Guide](../coding_guide). + +### One-time Setup + + [Optionally] Submit Contributor License Agreement + +Apache Software Foundation (ASF) desires that all contributors of ideas, code, or documentation to the Apache projects complete, sign, and submit an [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf) (ICLA). The purpose of this agreement is to clearly define the terms under which intellectual property has been contributed to the ASF and thereby allow us to defend the
[incubator-pulsar] branch master updated: [Schema] Provide a generic record interface for representing a typed message (#2452)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 757222d [Schema] Provide a generic record interface for representing a typed message (#2452) 757222d is described below commit 757222d9d232dbe03293df890609aa85ff63556c Author: Sijie Guo AuthorDate: Mon Aug 27 22:21:27 2018 -0700 [Schema] Provide a generic record interface for representing a typed message (#2452) * [Schema] Provide a generic record interface for representing a typed message ### Motivation In some use cases, the publishers and consumers don't know the type or schema of the messages ahead of time. For example, in pulsar io connector, when connecting a topic to a jdbc table, the connector doesn't know the tyep of the messages ahead of time; the connector can only fetch schema info from schema registry and that is the only information connector knows. It is impossible for mapping the messages to a relational database table. So we need a way to present a generic `Struct` record with fields. ### Changes Introduce `Field` and `GenericRecord` to represent `Struct` records deserialized with a schema. ### NotCovered This change only introduces the interfaces. It doesn't integrate with the producer and consumer workflow. That would be done in subsequent changes if we agree on the interfaces. --- .../org/apache/pulsar/client/api/schema/Field.java | 43 + .../pulsar/client/api/schema/GenericRecord.java| 51 +++ .../pulsar/client/impl/schema/AvroSchema.java | 17 ++-- .../client/impl/schema/GenericAvroRecord.java | 79 .../client/impl/schema/GenericAvroSchema.java | 102 + .../pulsar/client/schema/AvroSchemaTest.java | 47 +- 6 files changed, 329 insertions(+), 10 deletions(-) diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java new file mode 100644 index 000..653b5d6 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A field in a record, consisting of a field name, index, and + * {@link org.apache.pulsar.client.api.Schema} for the field value. + */ +@Data +@EqualsAndHashCode +@ToString +public class Field { + +/** + * The field name. + */ +private final String name; +/** + * The index of the field within the record. + */ +private final int index; + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java new file mode 100644 index 000..0a4fce4 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema;
[GitHub] sijie closed pull request #2452: [Schema] Provide a generic record interface for representing a typed message
sijie closed pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java new file mode 100644 index 00..653b5d6090 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A field in a record, consisting of a field name, index, and + * {@link org.apache.pulsar.client.api.Schema} for the field value. + */ +@Data +@EqualsAndHashCode +@ToString +public class Field { + +/** + * The field name. + */ +private final String name; +/** + * The index of the field within the record. + */ +private final int index; + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java new file mode 100644 index 00..0a4fce43cb --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import java.util.List; + +/** + * An interface represents a message with schema. + */ +public interface GenericRecord { + +/** + * Returns the list of fields associated with the record. + * + * @return the list of fields associated with the record. + */ +List getFields(); + +/** + * Retrieve the value of the provided field. + * + * @param field the field to retrieve the value + * @return the value object + */ +Object getField(Field field); + +/** + * Retrieve the value of the provided fieldName. + * + * @param fieldName the field name + * @return the value object + */ +Object getField(String fieldName); + +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index 7d90d2bf69..6867fdcb4d 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -45,8 +45,9 @@ private BinaryEncoder encoder; private ByteArrayOutputStream byteArrayOutputStream; -private AvroSchema(Class pojo, Map properties) { -this.schema = ReflectData.AllowNull.get().getSchema(pojo); +private AvroSchema(org.apache.avro.Schema schema, + Map properties) { +this.schema = schema; this.schemaInfo = new SchemaInfo(); this.schemaInfo.setName(""); @@ -61,8 +62,7 @@ private AvroSchema(Class
[GitHub] merlimat commented on a change in pull request #2460: Add backlog quota retention policy to server config.
merlimat commented on a change in pull request #2460: Add backlog quota retention policy to server config. URL: https://github.com/apache/incubator-pulsar/pull/2460#discussion_r213179479 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java ## @@ -86,6 +86,11 @@ private int backlogQuotaCheckIntervalInSeconds = 60; // Default per-topic backlog quota limit private long backlogQuotaDefaultLimitGB = 50; +//Default backlog quota retention policy. Default is producer_request_hold +//'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out) +//'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer +//'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog +private String backlogQuotaRetentionPolicy = "producer_request_hold"; Review comment: This could be set with the enum so that we can validate the value when loading the config file. Also, similar to the `backlogQuotaDefaultLimitGB` this config var could be called `backlogQuotaDefaultRetentionPolicy` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui opened a new pull request #2460: Add backlog quota retention policy to server config.
codelipenghui opened a new pull request #2460: Add backlog quota retention policy to server config. URL: https://github.com/apache/incubator-pulsar/pull/2460 ### Motivation Enable setting default backlog quota retention from server config file(broker.conf) ### Modifications Add backlogQuotaRetentionPolicy to broker.conf Add backlogQuotaRetentionPolicy to ServiceConfiguration Modify BacklogQuotaManager create logic, when backlogQuotaRetentionPolicy in ServiceConfiguration is not null, BacklogQuotaManager will use the ServiceConfiguration's backlogQuotaRetentionPolicy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages
merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages URL: https://github.com/apache/incubator-pulsar/pull/2455#issuecomment-416441172 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message
merlimat commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#issuecomment-416441285 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics
rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416422769 rerun cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar.wiki] branch master updated: Created Apache Maturity Model Assessment for Pulsar (markdown)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new b4cacd5 Created Apache Maturity Model Assessment for Pulsar (markdown) b4cacd5 is described below commit b4cacd5edd84de42d9b6c89bf6155f27d258b56f Author: Matteo Merli AuthorDate: Mon Aug 27 18:11:05 2018 -0700 Created Apache Maturity Model Assessment for Pulsar (markdown) --- Apache-Maturity-Model-Assessment-for-Pulsar.md | 40 ++ 1 file changed, 40 insertions(+) diff --git a/Apache-Maturity-Model-Assessment-for-Pulsar.md b/Apache-Maturity-Model-Assessment-for-Pulsar.md new file mode 100644 index 000..2074b03 --- /dev/null +++ b/Apache-Maturity-Model-Assessment-for-Pulsar.md @@ -0,0 +1,40 @@ +| ID | Description | Status | +|:---|:|:---| +| ***Code*** | | | +| CD10 | The project produces Open Source software, for distribution to the public at no charge. | **YES** The project source code is licensed under the Apache License, version 2.0. | +| CD20 | The project's code is easily discoverable and publicly accessible. | **YES** Linked from the website, available via GitBox https://gitbox.apache.org/repos/asf?p=incubator-pulsar.git and https://github.com/apache/incubator-pulsar. | +| CD30 | The code can be built in a reproducible way using widely available standard tools. | **YES** The build uses Apache Maven for Java code and CMake for C++ code. Continuous integration is used | +| CD40 | The full history of the project's code is available via a source code control system, in a way that allows any released version to be recreated. | **YES** All the history of the project is available through Git. All releases are properly tagged. | +| CD50 | The provenance of each line of code is established via the source code control system, in a reliable way based on strong authentication of the committer. When third-party contributions are committed, commit messages provide reliable information about the code provenance. | **YES** The git repository is managed by Apache Infra. Only Pulsar committers have write access. All code is checked in after at least 2 committers have approved a pull-request. For 3rd party contribution [...] +| ***Licenses and Copyright*** | | | +| LC10 | The code is released under the Apache License, version 2.0. | **YES** Source distributions clearly state license. Convenience binaries clearly state license. | +| LC20 | Libraries that are mandatory dependencies of the project's code do not create more restrictions than the Apache License does. | **YES** The list of mandatory dependencies have been reviewed to contain approved licenses only.| +| LC30 | The libraries mentioned in LC20 are available as Open Source software. | **YES** All dependencies are available as open source software. | +| LC40 | Committers are bound by an Individual Contributor Agreement (the "Apache iCLA") that defines which code they are allowed to commit and how they need to identify code that is not their own. | **YES** The project uses a repository managed by Apache Infra write access requires an Apache account, which requires an ICLA on file. | +| LC50 | The copyright ownership of everything that the project produces is clearly defined and documented. | **YES** Software Grant Agreement for the initial donation from Yahoo was filed. All files in the source repository have appropriate headers. Automated process is in place to ensure every file has expected headers. | +| ***Releases*** | | | +| RE10 | Releases consist of source code, distributed using standard and open archive formats that are expected to stay readable in the long term. | **YES** Source releases are distributed via https://dist.apache.org/repos/dist/release/incubator/pulsar/ and linked from the website at https://pulsar.incubator.apache.org/download/. +| RE20 | Releases are approved by the project's PMC (see CS10), in order to make them an act of the Foundation.| **YES** All incubating releases have been approved by the Pulsar community with at least 3 PPMC votes and from the Incubator with 3 IPMC votes. | +| RE30 | Releases are signed and/or distributed along with digests that can be reliably used to validate the downloaded archives. | **YES** All releases are signed, and the KEYS file is provided on dist.apache.org. | +| RE40 | Convenience binaries can be distributed alongside source code but they are not Apache Releases -- they are just a convenience provided with no guarantee. | **YES** Convenience binaries are distributed via via dist.apache.org. Java binary artifacts are also distributed through Maven Central Repository. | +| RE50 | The release process is documented and repeatable to the extent that someone new to
[GitHub] merlimat opened a new pull request #2459: Added Contributing page on website
merlimat opened a new pull request #2459: Added Contributing page on website URL: https://github.com/apache/incubator-pulsar/pull/2459 ### Motivation Added Contributing page on website. This is inspired and adapted from BookKeeper contributing guide http://bookkeeper.apache.org/community/contributing/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Remove schema validator from annotations since it requires slightly more information (#2445)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new b234f33 Remove schema validator from annotations since it requires slightly more information (#2445) b234f33 is described below commit b234f330ec6600c7ffd2201d4360182a5bd4d0b7 Author: Sanjeev Kulkarni AuthorDate: Mon Aug 27 17:42:35 2018 -0700 Remove schema validator from annotations since it requires slightly more information (#2445) and is therefore done explitcly --- .../main/java/org/apache/pulsar/functions/utils/FunctionConfig.java| 3 --- .../src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java| 3 +-- .../src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java | 1 - 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 63d8dd8..ac96fa9 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -29,7 +29,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidation; @@ -82,8 +81,6 @@ public class FunctionConfig { private Map customSerdeInputs; @isValidTopicName private String topicsPattern; -@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, -valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA) @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON) private Map customSchemaInputs; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index 6eb96c3..be886c4 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -66,8 +66,7 @@ public class SinkConfig { @isValidTopicName private String topicsPattern; -@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, -valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }) +@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }) private Map topicToSchemaType; private Map inputSpecs = new TreeMap<>(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java index 5807b40..38e200a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java @@ -58,7 +58,6 @@ public class SourceConfig { @isImplementationOfClass(implementsClass = SerDe.class) private String serdeClassName; -@isImplementationOfClass(implementsClass = Schema.class) private String schemaType; private Map configs;
[GitHub] srkukarni closed pull request #2445: Remove schema validator from annotations
srkukarni closed pull request #2445: Remove schema validator from annotations URL: https://github.com/apache/incubator-pulsar/pull/2445 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 63d8dd847a..ac96fa90a7 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -29,7 +29,6 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.utils.validation.ConfigValidation; @@ -82,8 +81,6 @@ private Map customSerdeInputs; @isValidTopicName private String topicsPattern; -@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, -valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }, targetRuntime = ConfigValidation.Runtime.JAVA) @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, targetRuntime = ConfigValidation.Runtime.PYTHON) private Map customSchemaInputs; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index 6eb96c39e0..be886c41ae 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -66,8 +66,7 @@ @isValidTopicName private String topicsPattern; -@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, -valueValidatorClasses = { ValidatorImpls.SchemaValidator.class }) +@isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }) private Map topicToSchemaType; private Map inputSpecs = new TreeMap<>(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java index 5807b403d6..38e200aefb 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfig.java @@ -58,7 +58,6 @@ @isImplementationOfClass(implementsClass = SerDe.class) private String serdeClassName; -@isImplementationOfClass(implementsClass = Schema.class) private String schemaType; private Map configs; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit commented on issue #2457: Updated documentation about changed arguments for functions, sources and sinks
asfgit commented on issue #2457: Updated documentation about changed arguments for functions, sources and sinks URL: https://github.com/apache/incubator-pulsar/pull/2457#issuecomment-416409829 SUCCESS --none-- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit commented on issue #2453: Fixed cpp multi-topic consumer when topics are not partitioned
asfgit commented on issue #2453: Fixed cpp multi-topic consumer when topics are not partitioned URL: https://github.com/apache/incubator-pulsar/pull/2453#issuecomment-416408967 SUCCESS --none-- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213151503 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java ## @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, Pulsar .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(kafkaServiceName) .withHostName(clusterName + "-" + kafkaServiceName))); + final String cassandraServiceName = "cassandra"; externalServices.put( cassandraServiceName, new CassandraContainer(clusterName)); + +// use mySQL for jdbc test +final String jdbcServiceName = "jdbc"; +externalServices.put( +jdbcServiceName, +new MySQLContainer() Review comment: Thanks, this is already done in PulsarCluster This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213151155 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java ## @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableId { +private final String catalogName; +private final String schemaName; +private final String tableName; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class ColumnId { +private final TableId tableId; +private final String name; +// SQL type from java.sql.Types +private final int type; +private final String typeName; +// column position in table +private final int position; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableDefinition { +private final TableId tableId; +private final List columns; +} + +/** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types + */ +public static String getDriverClassName(String driver) throws Exception { +if (driver.equals("mysql")) { +return "com.mysql.jdbc.Driver"; +} if (driver.equals("sqlite")) { +return "org.sqlite.JDBC"; +} else { +throw new Exception("Not tested jdbc driver type: " + driver); Review comment: Thanks, we may better tested before add more types, fail early is better. For new type, we should also add dependency in pom file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia removed a comment on issue #2438: Fix: Function assignment can support large number of topics
rdhabalia removed a comment on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416407418 rerun cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics
rdhabalia commented on issue #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438#issuecomment-416407418 rerun cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2458: Fixed few links in contact and download pages
merlimat opened a new pull request #2458: Fixed few links in contact and download pages URL: https://github.com/apache/incubator-pulsar/pull/2458 ### Motivation Fixed few links in contact and download pages This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
jiazhai commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213150850 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); +} + +@Override +public void write(Record record) throws Exception { +int number; +synchronized (incomingList) { +incomingList.add(record); +number = incomingList.size(); +} + +if (number == batchSize) { +flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); +} +} + +// bind value with a
[GitHub] srkukarni opened a new pull request #2457: Updated documentation about changed arguments for functions, sources and sinks
srkukarni opened a new pull request #2457: Updated documentation about changed arguments for functions, sources and sinks URL: https://github.com/apache/incubator-pulsar/pull/2457 ### Motivation We recently changed the argument style of functions/sources and sinks to match the rest of pulsar cli. This pr updates the documentation of the same ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2448: Add multi-topic and regex consumer in Go client
merlimat closed pull request #2448: Add multi-topic and regex consumer in Go client URL: https://github.com/apache/incubator-pulsar/pull/2448 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h index 2da7c6de92..4b603bbff3 100644 --- a/pulsar-client-cpp/include/pulsar/c/client.h +++ b/pulsar-client-cpp/include/pulsar/c/client.h @@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c const pulsar_consumer_configuration_t *conf, pulsar_subscribe_callback callback, void *ctx); +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, +const char *subscriptionName, +const pulsar_consumer_configuration_t *conf, +pulsar_subscribe_callback callback, void *ctx); + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); + /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified * topic. diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc index 905e41067f..ecefe20c03 100644 --- a/pulsar-client-cpp/lib/c/c_Client.cc +++ b/pulsar-client-cpp/lib/c/c_Client.cc @@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c boost::bind(_subscribe_callback, _1, _2, callback, ctx)); } +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, +const char *subscriptionName, +const pulsar_consumer_configuration_t *conf, +pulsar_subscribe_callback callback, void *ctx) { +std::vector topicsList; +for (int i = 0; i < topicsCount; i++) { +topicsList.push_back(topics[i]); +} + +client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration, + boost::bind(_subscribe_callback, _1, _2, callback, ctx)); +} + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { +client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration, + boost::bind(_subscribe_callback, _1, _2, callback, ctx)); +} + pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic, const pulsar_message_id_t *startMessageId, pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) { diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index aebabc877d..093dd9dc6e 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -25,10 +25,10 @@ package pulsar import "C" import ( + "context" "runtime" "time" "unsafe" - "context" ) type consumer struct { @@ -64,7 +64,7 @@ type subscribeContext struct { } func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) { - if options.Topic == "" { + if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" { go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required")) return } @@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu C.pulsar_consumer_set_consumer_name(conf, name) } - topic := C.CString(options.Topic) subName := C.CString(options.SubscriptionName) - defer C.free(unsafe.Pointer(topic)) defer C.free(unsafe.Pointer(subName)) - C._pulsar_client_subscribe_async(client.ptr, topic,
[GitHub] merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages
merlimat commented on issue #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages URL: https://github.com/apache/incubator-pulsar/pull/2455#issuecomment-416401591 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138309 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); Review comment: nit: log.info("Open jdbc connection : {}", jdbcUrl); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139143 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); +} + +@Override +public void write(Record record) throws Exception { +int number; +synchronized (incomingList) { +incomingList.add(record); +number = incomingList.size(); +} + +if (number == batchSize) { +flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); +} +} + +// bind value with a
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139185 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); +} + +@Override +public void write(Record record) throws Exception { +int number; +synchronized (incomingList) { +incomingList.add(record); +number = incomingList.size(); +} + +if (number == batchSize) { +flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); +} +} + +// bind value with a
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139561 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java ## @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableId { +private final String catalogName; +private final String schemaName; +private final String tableName; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class ColumnId { +private final TableId tableId; +private final String name; +// SQL type from java.sql.Types +private final int type; +private final String typeName; +// column position in table +private final int position; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableDefinition { +private final TableId tableId; +private final List columns; +} + +/** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types + */ +public static String getDriverClassName(String driver) throws Exception { +if (driver.equals("mysql")) { +return "com.mysql.jdbc.Driver"; +} if (driver.equals("sqlite")) { +return "org.sqlite.JDBC"; +} else { +throw new Exception("Not tested jdbc driver type: " + driver); +} +} + +/** + * Get the {@link Connection} for the given jdbcUrl. + */ +public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception { +String driver = jdbcUrl.split(":")[1]; +String driverClassName = getDriverClassName(driver); +Class.forName(driverClassName); + +return DriverManager.getConnection(jdbcUrl, properties); +} + +/** + * Get the {@link TableId} for the given tableName. + */ +public static TableId getTableId(Connection connection, String tableName) throws Exception { +DatabaseMetaData metadata = connection.getMetaData(); +try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) { +if (rs.next()) { +String catalogName = rs.getString(1); +String schemaName = rs.getString(2); +String gotTableName = rs.getString(3); +checkState(tableName.equals(gotTableName), +"TableName not match: " + tableName + " Got: " + gotTableName); +log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName); Review comment: if (log.isDebugEnabled()) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140305 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java ## @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, Pulsar .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(kafkaServiceName) .withHostName(clusterName + "-" + kafkaServiceName))); + final String cassandraServiceName = "cassandra"; externalServices.put( cassandraServiceName, new CassandraContainer(clusterName)); + +// use mySQL for jdbc test +final String jdbcServiceName = "jdbc"; +externalServices.put( +jdbcServiceName, +new MySQLContainer() Review comment: attach clusterName to the hostname ``` new MySQLContainer() .withNetworkAliases(jdbcServiceName) .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(jdbcServiceName) .withHostName(clusterName + "-" + jdbcServiceName))); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138387 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); Review comment: nit: log.info("Closed jdbc connection : {}", jdbcUrl); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139604 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java ## @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableId { +private final String catalogName; +private final String schemaName; +private final String tableName; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class ColumnId { +private final TableId tableId; +private final String name; +// SQL type from java.sql.Types +private final int type; +private final String typeName; +// column position in table +private final int position; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableDefinition { +private final TableId tableId; +private final List columns; +} + +/** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types + */ +public static String getDriverClassName(String driver) throws Exception { +if (driver.equals("mysql")) { +return "com.mysql.jdbc.Driver"; +} if (driver.equals("sqlite")) { +return "org.sqlite.JDBC"; +} else { +throw new Exception("Not tested jdbc driver type: " + driver); +} +} + +/** + * Get the {@link Connection} for the given jdbcUrl. + */ +public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception { +String driver = jdbcUrl.split(":")[1]; +String driverClassName = getDriverClassName(driver); +Class.forName(driverClassName); + +return DriverManager.getConnection(jdbcUrl, properties); +} + +/** + * Get the {@link TableId} for the given tableName. + */ +public static TableId getTableId(Connection connection, String tableName) throws Exception { +DatabaseMetaData metadata = connection.getMetaData(); +try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) { +if (rs.next()) { +String catalogName = rs.getString(1); +String schemaName = rs.getString(2); +String gotTableName = rs.getString(3); +checkState(tableName.equals(gotTableName), +"TableName not match: " + tableName + " Got: " + gotTableName); +log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName); +return TableId.of(catalogName, schemaName, tableName); +} else { +throw new Exception("Not able to find table: " + tableName); +} +} +} + +/** + * Get the {@link TableDefinition} for the given table. + */ +public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception { +TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList()); + +try (ResultSet rs = connection.getMetaData().getColumns( +tableId.getCatalogName(), +tableId.getSchemaName(), +
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139256 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); +} + +@Override +public void write(Record record) throws Exception { +int number; +synchronized (incomingList) { +incomingList.add(record); +number = incomingList.size(); +} + +if (number == batchSize) { +flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); +} +} + +// bind value with a
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213139527 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java ## @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableId { +private final String catalogName; +private final String schemaName; +private final String tableName; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class ColumnId { +private final TableId tableId; +private final String name; +// SQL type from java.sql.Types +private final int type; +private final String typeName; +// column position in table +private final int position; +} + +@Data(staticConstructor = "of") +@Setter +@Getter +@EqualsAndHashCode +@ToString +public static class TableDefinition { +private final TableId tableId; +private final List columns; +} + +/** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types + */ +public static String getDriverClassName(String driver) throws Exception { +if (driver.equals("mysql")) { +return "com.mysql.jdbc.Driver"; +} if (driver.equals("sqlite")) { +return "org.sqlite.JDBC"; +} else { +throw new Exception("Not tested jdbc driver type: " + driver); Review comment: probably log a warning message rather than failing it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213138978 ## File path: pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java ## @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink implements Sink { +// - Runtime fields +private JdbcSinkConfig jdbcSinkConfig; +@Getter +private Connection connection; +private String tableName; + +private JdbcUtils.TableId tableId; +private PreparedStatement insertStatement; + +// TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) +protected String schema; +protected JdbcUtils.TableDefinition tableDefinition; + +// for flush +private List> incomingList; +private List> swapList; +private AtomicBoolean isFlushing; +private int timeoutMs; +private int batchSize; +private ScheduledExecutorService flushExecutor; + +@Override +public void open(Map config, SinkContext sinkContext) throws Exception { +jdbcSinkConfig = JdbcSinkConfig.load(config); + +String jdbcUrl = jdbcSinkConfig.getJdbcUrl(); +if (jdbcSinkConfig.getJdbcUrl() == null) { +throw new IllegalArgumentException("Required jdbc Url not set."); +} + +Properties properties = new Properties(); +String username = jdbcSinkConfig.getUserName(); +String password = jdbcSinkConfig.getPassword(); +if (username != null) { +properties.setProperty("user", username); +} +if (password != null) { +properties.setProperty("password", password); +} + +connection = JdbcUtils.getConnection(jdbcUrl, properties); +log.info("Connection opened"); + +schema = jdbcSinkConfig.getSchema(); +tableName = jdbcSinkConfig.getTableName(); +tableId = JdbcUtils.getTableId(connection, tableName); +tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); +insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + +timeoutMs = jdbcSinkConfig.getTimeoutMs(); +batchSize = jdbcSinkConfig.getBatchSize(); +incomingList = Lists.newArrayList(); +swapList = Lists.newArrayList(); +isFlushing = new AtomicBoolean(false); + +flushExecutor = Executors.newScheduledThreadPool(1); +flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); +} + +@Override +public void close() throws Exception { +if (!connection.getAutoCommit()) { +connection.commit(); +} +flushExecutor.shutdown(); +if (connection != null) { +connection.close(); +} +log.info("Connection Closed"); +} + +@Override +public void write(Record record) throws Exception { +int number; +synchronized (incomingList) { +incomingList.add(record); +number = incomingList.size(); +} + +if (number == batchSize) { +flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); +} +} + +// bind value with a
[GitHub] sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector
sijie commented on a change in pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440#discussion_r213140171 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java ## @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, Pulsar .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(kafkaServiceName) .withHostName(clusterName + "-" + kafkaServiceName))); + final String cassandraServiceName = "cassandra"; externalServices.put( cassandraServiceName, new CassandraContainer(clusterName)); + +// use mySQL for jdbc test +final String jdbcServiceName = "jdbc"; Review comment: jdbcServiceName = "mysql" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2406: Fix flake in DiscoveryServiceTest (#1081)
sijie commented on issue #2406: Fix flake in DiscoveryServiceTest (#1081) URL: https://github.com/apache/incubator-pulsar/pull/2406#issuecomment-416392332 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context
sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#issuecomment-416392265 take this out of 2.2. will revisit this for 2.3 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se opened a new pull request #2456: Remove Aerospike Connector
aahmed-se opened a new pull request #2456: Remove Aerospike Connector URL: https://github.com/apache/incubator-pulsar/pull/2456 Remove Aerospike connector due to license considerations of dependent libraries. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages
merlimat opened a new pull request #2455: Fixed {{pulsar.version}} into {{pulsar:version}} in few pages URL: https://github.com/apache/incubator-pulsar/pull/2455 ### Motivation In few places in website we're displaying the literal `{{pulsar.version}}` instead of the actual Pulsar version string. The correct variable name is expected to use `:` as separator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message
sijie commented on issue #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#issuecomment-416346124 @merlimat I have addressed your comments. PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message
sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213092862 ## File path: pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java ## @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A field in a record, consisting of a field name, index, and + * {@link org.apache.pulsar.client.api.Schema} for the field value. + */ +@Data +@EqualsAndHashCode +@ToString +public class Field { + +private final String name; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2454: Fix compilation issue on CLITest
sijie opened a new pull request #2454: Fix compilation issue on CLITest URL: https://github.com/apache/incubator-pulsar/pull/2454 ### Motivation #2182 is cherry-picked into branch 2.1. But some of integration test changes are not cherry-picked back. So the compilation is broken ### Changes Fix the compilation issue on CLITest This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message
merlimat commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213078989 ## File path: pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java ## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import java.util.List; + +/** + * An interface represents a message with schema. + */ +public interface GenericRecord { + +/** + * Returns the list of fields associated with the record. + * + * @return the list of fields associated with the record. + */ +List getFields(); Review comment: Sure, we can do the minimum required by JDBC now and continue later. Only to make sure the API can gracefully evolve later when adding nested objects. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty
sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty URL: https://github.com/apache/incubator-pulsar/pull/2372#issuecomment-416328437 actually this change is related bunch of other changes. so not going to merge it to branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: Revert "Kinesis-sink consider topic-name as partition-key if record key empty (#2372)"
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9ede4c8 Revert "Kinesis-sink consider topic-name as partition-key if record key empty (#2372)" 9ede4c8 is described below commit 9ede4c80802626065f70fb5e20b81bafb6f33969 Author: Sijie Guo AuthorDate: Mon Aug 27 11:47:56 2018 -0700 Revert "Kinesis-sink consider topic-name as partition-key if record key empty (#2372)" This reverts commit 703305b5426856bab7bab30a41e4f242e7782dc7. --- .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index cdfadde..67de21a 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -114,7 +114,7 @@ public class KinesisSink implements Sink { record.getRecordSequence()); throw new IllegalStateException("kinesis queue has publish failure"); } -String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey)); +String partitionedKey = record.getKey().orElse(defaultPartitionedKey); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256
[GitHub] srkukarni commented on issue #2451: Issue 2312: add python client multi-topics consumer support
srkukarni commented on issue #2451: Issue 2312: add python client multi-topics consumer support URL: https://github.com/apache/incubator-pulsar/pull/2451#issuecomment-416327192 Once #2448 is in, I will add support for functions as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2450: Added Reader.HasNext in Go client
sijie commented on issue #2450: Added Reader.HasNext in Go client URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416326440 merged as b6dc7c1 in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: Added Reader.HasNext in Go client (#2450)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new b6dc7c1 Added Reader.HasNext in Go client (#2450) b6dc7c1 is described below commit b6dc7c1eae314e65cf782a7b0201408279dc9e8d Author: Matteo Merli AuthorDate: Mon Aug 27 11:41:24 2018 -0700 Added Reader.HasNext in Go client (#2450) ### Motivation Added `Reader.HasNext()` in Go client library --- pulsar-client-go/pulsar/c_reader.go| 13 + pulsar-client-go/pulsar/reader.go | 3 +++ pulsar-client-go/pulsar/reader_test.go | 12 ++-- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 730f9b8..12c1103 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } } +func (r *reader) HasNext() (bool, error) { + value := C.int(0) + res := C.pulsar_reader_has_message_available(r.ptr, ) + + if res != C.pulsar_result_Ok { + return false, newError(res, "Failed to check if next message is available") + } else if value == C.int(1) { + return true, nil + } else { + return false, nil + } +} + func (r *reader) Close() error { channel := make(chan error) r.CloseAsync(func(err error) { channel <- err; close(channel) }) diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go index f61ebd7..7015c9c 100644 --- a/pulsar-client-go/pulsar/reader.go +++ b/pulsar-client-go/pulsar/reader.go @@ -67,6 +67,9 @@ type Reader interface { // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) + // Check if there is any message available to read from the current position + HasNext() (bool, error) + // Close the reader and stop the broker to push more messages Close() error } diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 11d1b36..3b075e1 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" ) func TestReaderConnectError(t *testing.T) { @@ -80,12 +80,20 @@ func TestReader(t *testing.T) { t.Fatal(err) } + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, true) + msg, err := reader.Next(ctx) assertNil(t, err) assertNotNil(t, msg) assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) } + + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, false) } func TestReaderWithInvalidConf(t *testing.T) {
[incubator-pulsar] branch master updated: Added Reader.HasNext in Go client (#2450)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 73bc19d Added Reader.HasNext in Go client (#2450) 73bc19d is described below commit 73bc19dc41b6a06bd7051216682cc5d2fc5d5592 Author: Matteo Merli AuthorDate: Mon Aug 27 11:41:24 2018 -0700 Added Reader.HasNext in Go client (#2450) ### Motivation Added `Reader.HasNext()` in Go client library --- pulsar-client-go/pulsar/c_reader.go| 13 + pulsar-client-go/pulsar/reader.go | 3 +++ pulsar-client-go/pulsar/reader_test.go | 12 ++-- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 730f9b8..12c1103 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } } +func (r *reader) HasNext() (bool, error) { + value := C.int(0) + res := C.pulsar_reader_has_message_available(r.ptr, ) + + if res != C.pulsar_result_Ok { + return false, newError(res, "Failed to check if next message is available") + } else if value == C.int(1) { + return true, nil + } else { + return false, nil + } +} + func (r *reader) Close() error { channel := make(chan error) r.CloseAsync(func(err error) { channel <- err; close(channel) }) diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go index f61ebd7..7015c9c 100644 --- a/pulsar-client-go/pulsar/reader.go +++ b/pulsar-client-go/pulsar/reader.go @@ -67,6 +67,9 @@ type Reader interface { // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) + // Check if there is any message available to read from the current position + HasNext() (bool, error) + // Close the reader and stop the broker to push more messages Close() error } diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 11d1b36..3b075e1 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" ) func TestReaderConnectError(t *testing.T) { @@ -80,12 +80,20 @@ func TestReader(t *testing.T) { t.Fatal(err) } + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, true) + msg, err := reader.Next(ctx) assertNil(t, err) assertNotNil(t, msg) assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) } + + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, false) } func TestReaderWithInvalidConf(t *testing.T) {
[incubator-pulsar] branch branch-2.1 updated: Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2595941 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) 2595941 is described below commit 259594165f5ee3c69a29629534af81011cab4ef1 Author: Jia Zhai AuthorDate: Tue Aug 28 02:38:41 2018 +0800 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) * change getTopicName in MultiTopicsConsumer * change following sijie's comments * keep both topicName and topicPartitonName in consumer to avoid new string --- .../apache/pulsar/client/impl/ConsumerImpl.java| 8 +++ .../client/impl/MultiTopicsConsumerImpl.java | 9 .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++ .../pulsar/client/impl/TopicMessageImpl.java | 20 .../client/impl/UnAckedTopicMessageTracker.java| 4 ++-- .../pulsar/client/impl/MessageIdCompareToTest.java | 6 - .../tests/integration/semantics/SemanticsTest.java | 2 +- 7 files changed, 60 insertions(+), 16 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1a7b67b..fe0e2c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; +private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle NonPersistentAcknowledgmentGroupingTracker.of(); } +topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle this.connectionHandler.grabCnx(); } +public String getTopicNameWithoutPartition() { +return topicNameWithoutPartition; +} + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bbfb3f3..6d5d56d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -230,7 +230,8 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { -TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); +TopicMessageImpl topicMessage = new TopicMessageImpl<>( +consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -369,7 +370,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } if (ackType == AckType.Cumulative) { -Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); +Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -377,7 +378,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { -ConsumerImpl consumer = consumers.get(topicMessageId.getTopicName()); +ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -510,7 +511,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) -
[GitHub] sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer
sijie commented on issue #2346: Issue #2330: change getTopicName in MultiTopicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/2346#issuecomment-416325631 merged as 2595941 into branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Issue #2330: change getTopicName in MultiTopicsConsumer (#2346)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8688b67 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) 8688b67 is described below commit 8688b6733177ee4a48d805df18a1d82865cf7320 Author: Jia Zhai AuthorDate: Tue Aug 28 02:38:41 2018 +0800 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) * change getTopicName in MultiTopicsConsumer * change following sijie's comments * keep both topicName and topicPartitonName in consumer to avoid new string --- .../apache/pulsar/client/impl/ConsumerImpl.java| 8 +++ .../client/impl/MultiTopicsConsumerImpl.java | 9 .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++ .../pulsar/client/impl/TopicMessageImpl.java | 20 .../client/impl/UnAckedTopicMessageTracker.java| 4 ++-- .../pulsar/client/impl/MessageIdCompareToTest.java | 6 - .../tests/integration/semantics/SemanticsTest.java | 2 +- 7 files changed, 60 insertions(+), 16 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index da04534..fe37b69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; +private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle NonPersistentAcknowledgmentGroupingTracker.of(); } +topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle this.connectionHandler.grabCnx(); } +public String getTopicNameWithoutPartition() { +return topicNameWithoutPartition; +} + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 799bec8..f1cb9cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -231,7 +231,8 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { -TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); +TopicMessageImpl topicMessage = new TopicMessageImpl<>( +consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -370,7 +371,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } if (ackType == AckType.Cumulative) { -Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); +Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -378,7 +379,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { -ConsumerImpl consumer = consumers.get(topicMessageId.getTopicName()); +ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -511,7 +512,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) -
[GitHub] sijie closed pull request #2346: Issue #2330: change getTopicName in MultiTopicsConsumer
sijie closed pull request #2346: Issue #2330: change getTopicName in MultiTopicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/2346 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index da04534d94..fe37b69a50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; +private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ NonPersistentAcknowledgmentGroupingTracker.of(); } +topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ void grabCnx() { this.connectionHandler.grabCnx(); } +public String getTopicNameWithoutPartition() { +return topicNameWithoutPartition; +} + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 799bec8e4d..f1cb9cf262 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -231,7 +231,8 @@ private void messageReceived(ConsumerImpl consumer, Message message) { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { -TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); +TopicMessageImpl topicMessage = new TopicMessageImpl<>( +consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -370,7 +371,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } if (ackType == AckType.Cumulative) { -Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); +Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -378,7 +379,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { -ConsumerImpl consumer = consumers.get(topicMessageId.getTopicName()); +ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -511,7 +512,7 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) -.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet())) + .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) .forEach((topicName, messageIds1) -> consumers.get(topicName) .redeliverUnacknowledgedMessages(messageIds1.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 071b804ffb..dd1b37dd33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,20 +18,39 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + import java.util.Objects; import org.apache.pulsar.client.api.MessageId;
[GitHub] sijie commented on issue #2434: Add pulsar flink sink connector
sijie commented on issue #2434: Add pulsar flink sink connector URL: https://github.com/apache/incubator-pulsar/pull/2434#issuecomment-416325086 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support
sijie commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213071325 ## File path: pulsar-client-cpp/python/pulsar/__init__.py ## @@ -503,6 +503,211 @@ def my_listener(consumer, message): self._consumers.append(c) return c +def subscribe_topics(self, topics, subscription_name, Review comment: @jiazhai can you open a new issue addressing this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: fix issue with protocol version not being correct (#2444)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 96bf00f fix issue with protocol version not being correct (#2444) 96bf00f is described below commit 96bf00fa12bf74db57d36f40989e233752288351 Author: Boyang Jerry Peng AuthorDate: Fri Aug 24 22:40:58 2018 -0700 fix issue with protocol version not being correct (#2444) --- .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 6a8ab74..b2d9478 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -232,8 +232,9 @@ public class ClientCnx extends PulsarHandler { if (log.isDebugEnabled()) { log.debug("{} Connection is ready", ctx.channel()); } -connectionFuture.complete(null); +// set remote protocol version to the correct version before we complete the connection future remoteEndpointProtocolVersion = connected.getProtocolVersion(); +connectionFuture.complete(null); state = State.Ready; }
[GitHub] sijie commented on issue #2444: fix issue with protocol version not being correct
sijie commented on issue #2444: fix issue with protocol version not being correct URL: https://github.com/apache/incubator-pulsar/pull/2444#issuecomment-416323693 merged as 96bf00f in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2439: leak assign schema in MessageImpl constructor
sijie commented on issue #2439: leak assign schema in MessageImpl constructor URL: https://github.com/apache/incubator-pulsar/pull/2439#issuecomment-416323334 merged as acd77a0 in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: leak assign schema in MessageImpl constructor (#2439)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new acd77a0 leak assign schema in MessageImpl constructor (#2439) acd77a0 is described below commit acd77a0fe6f09de1919040b65a90fb145f7f1bfa Author: Jia Zhai AuthorDate: Fri Aug 24 11:42:55 2018 +0800 leak assign schema in MessageImpl constructor (#2439) --- .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 5ea8bfe..96b68c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -155,6 +155,7 @@ public class MessageImpl implements Message { this.cnx = null; this.payload = payload; this.properties = Collections.unmodifiableMap(properties); +this.schema = schema; } public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException {
[incubator-pulsar] branch branch-2.1 updated: Increased default brokerShutdownTimeout to 60 seconds (#2377)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new caf8d67 Increased default brokerShutdownTimeout to 60 seconds (#2377) caf8d67 is described below commit caf8d676f0a49896269b57032bd07ae3b4287564 Author: Matteo Merli AuthorDate: Wed Aug 15 11:50:18 2018 -0700 Increased default brokerShutdownTimeout to 60 seconds (#2377) ### Motivation The default timeout for broker graceful shutdown is set to 3 seconds. This can give little room to do graceful shutdown when the broker is serving a lot of topics. There is no big downside in increasing the timeout to a much bigger value. --- conf/broker.conf| 2 +- conf/standalone.conf| 2 +- deployment/terraform-ansible/templates/broker.conf | 2 +- .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2e40d82..cd4a505 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -53,7 +53,7 @@ failureDomainsEnabled=false zooKeeperSessionTimeoutMillis=3 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=3000 +brokerShutdownTimeoutMs=6 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 0d7a395..cc03b95 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -46,7 +46,7 @@ failureDomainsEnabled=false zooKeeperSessionTimeoutMillis=3 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=3000 +brokerShutdownTimeoutMs=6 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 73d4e2f..a7456a4 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -53,7 +53,7 @@ failureDomainsEnabled=false zooKeeperSessionTimeoutMillis=3 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed -brokerShutdownTimeoutMs=3000 +brokerShutdownTimeoutMs=6 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1bb6fff..ddc7728 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -78,7 +78,7 @@ public class ServiceConfiguration implements PulsarConfiguration { // Time to wait for broker graceful shutdown. After this time elapses, the // process will be killed @FieldContext(dynamic = true) -private long brokerShutdownTimeoutMs = 3000; +private long brokerShutdownTimeoutMs = 6; // Enable backlog quota check. Enforces action on topic when the quota is // reached private boolean backlogQuotaCheckEnabled = true;
[GitHub] sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty
sijie commented on issue #2372: Kinesis-sink consider topic-name as partition-key if record key empty URL: https://github.com/apache/incubator-pulsar/pull/2372#issuecomment-416319348 merged as 703305b in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2420: [client] add properties to producer for cpp & python client
sijie commented on issue #2420: [client] add properties to producer for cpp & python client URL: https://github.com/apache/incubator-pulsar/pull/2420#issuecomment-416321563 cherry-pick as d7ec1c5 in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: [client] add properties to producer for cpp & python client (#2420)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new d7ec1c5 [client] add properties to producer for cpp & python client (#2420) d7ec1c5 is described below commit d7ec1c5051ee3f7066cd1ba1572606760a3c371e Author: Sijie Guo AuthorDate: Wed Aug 22 09:32:46 2018 -0700 [client] add properties to producer for cpp & python client (#2420) * [client] add properties to producer for cpp & python client ### Motivation This is a caught-up change to enable properties for producer as java clients. ### Changes Enable properties on producer for both cpp & python client ### Results Properties are added as metadata for CommandProducer. However there is no way to verify the producer properties. so I didn't add any specific tests, just adding properties for both cpp and python clients in the tests, that should excerise the corresponding code path. * Add `properties` to pydoc --- .../include/pulsar/ProducerConfiguration.h | 34 ++ pulsar-client-cpp/lib/Commands.cc | 11 ++- pulsar-client-cpp/lib/Commands.h | 3 +- pulsar-client-cpp/lib/ProducerConfiguration.cc | 34 +- pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 1 + pulsar-client-cpp/lib/ProducerImpl.cc | 3 +- pulsar-client-cpp/python/pulsar/__init__.py| 11 ++- pulsar-client-cpp/python/src/config.cc | 1 + pulsar-client-cpp/python/test_producer.py | 6 +++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++ 10 files changed, 100 insertions(+), 6 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 9f7bf1f..45154c5 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -127,6 +127,40 @@ class ProducerConfiguration { bool isEncryptionEnabled() const; ProducerConfiguration& addEncryptionKey(std::string key); +/** + * Check whether the message has a specific property attached. + * + * @param name the name of the property to check + * @return true if the message has the specified property + * @return false if the property is not defined + */ +bool hasProperty(const std::string& name) const; + +/** + * Get the value of a specific property + * + * @param name the name of the property + * @return the value of the property or null if the property was not defined + */ +const std::string& getProperty(const std::string& name) const; + +/** + * Get all the properties attached to this producer. + */ +std::map& getProperties() const; + +/** + * Sets a new property on a message. + * @param name the name of the property + * @param value the associated value + */ +ProducerConfiguration& setProperty(const std::string& name, const std::string& value); + +/** + * Add all the properties in the provided map + */ +ProducerConfiguration& setProperties(const std::map& properties); + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 8bd0128..4d9f1be 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -22,6 +22,7 @@ #include "pulsar/MessageBuilder.h" #include "PulsarApi.pb.h" #include "LogUtils.h" +#include "PulsarApi.pb.h" #include "Utils.h" #include "Url.h" #include "checksum/ChecksumProvider.h" @@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) { } SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId, - const std::string& producerName, uint64_t requestId) { + const std::string& producerName, uint64_t requestId, + const std::map& metadata) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); producer->set_topic(topic); producer->set_producer_id(producerId); producer->set_request_id(requestId); +for (std::map::const_iterator it = metadata.begin(); it != metadata.end(); + it++) { +proto::KeyValue* keyValue = proto::KeyValue().New(); +keyValue->set_key(it->first); +keyValue->set_value(it->second); +producer->mutable_metadata()->AddAllocated(keyValue); +} if (!producerName.empty()) { producer->set_producer_name(producerName); diff --git a/pulsar-client-cpp/lib/Commands.h
[incubator-pulsar] branch branch-2.1 updated: [client] add properties to consumer for cpp & python client (#2423)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new ffde1bc [client] add properties to consumer for cpp & python client (#2423) ffde1bc is described below commit ffde1bcd45c73e94d5f41a05fc6fb2c6f31d3764 Author: Sijie Guo AuthorDate: Wed Aug 22 02:41:00 2018 -0700 [client] add properties to consumer for cpp & python client (#2423) * [client] add properties to consumer for cpp & python client ### Motivation This is a caught-up change to enable properties for consumer as java clients. ### Changes Enable properties on consumer for both cpp & python client ### Results Properties are added as metadata for CommandSubscribe. However there is no way to verify the consumer properties. so I didn't add any specific tests, just adding properties for both cpp and python clients in the tests, that should excerise the corresponding code path. * remove "make format" --- .../include/pulsar/ConsumerConfiguration.h | 34 ++ pulsar-client-cpp/lib/Commands.cc | 12 +++- pulsar-client-cpp/lib/Commands.h | 2 +- pulsar-client-cpp/lib/ConsumerConfiguration.cc | 32 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 ++- pulsar-client-cpp/lib/ConsumerImpl.cc | 2 +- pulsar-client-cpp/python/pulsar/__init__.py| 11 ++- pulsar-client-cpp/python/src/config.cc | 1 + pulsar-client-cpp/python/test_consumer.py | 6 +++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 6 +++- 10 files changed, 103 insertions(+), 7 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index 36e5808..0687166 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -162,6 +162,40 @@ class ConsumerConfiguration { void setPatternAutoDiscoveryPeriod(int periodInSeconds); int getPatternAutoDiscoveryPeriod() const; +/** + * Check whether the message has a specific property attached. + * + * @param name the name of the property to check + * @return true if the message has the specified property + * @return false if the property is not defined + */ +bool hasProperty(const std::string& name) const; + +/** + * Get the value of a specific property + * + * @param name the name of the property + * @return the value of the property or null if the property was not defined + */ +const std::string& getProperty(const std::string& name) const; + +/** + * Get all the properties attached to this producer. + */ +std::map& getProperties() const; + +/** + * Sets a new property on a message. + * @param name the name of the property + * @param value the associated value + */ +ConsumerConfiguration& setProperty(const std::string& name, const std::string& value); + +/** + * Add all the properties in the provided map + */ +ConsumerConfiguration& setProperties(const std::map& properties); + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 8a1933b..8bd0128 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -20,6 +20,7 @@ #include "MessageImpl.h" #include "Version.h" #include "pulsar/MessageBuilder.h" +#include "PulsarApi.pb.h" #include "LogUtils.h" #include "Utils.h" #include "Url.h" @@ -27,6 +28,7 @@ #include #include +using namespace pulsar; namespace pulsar { using namespace pulsar::proto; @@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, -Optional startMessageId, bool readCompacted) { +Optional startMessageId, bool readCompacted, +const std::map& metadata) { BaseCommand cmd; cmd.set_type(BaseCommand::SUBSCRIBE); CommandSubscribe* subscribe = cmd.mutable_subscribe(); @@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& messageIdData.set_batch_index(startMessageId.value().batchIndex()); } } +for (std::map::const_iterator it
[GitHub] sijie commented on issue #2399: Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl
sijie commented on issue #2399: Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl URL: https://github.com/apache/incubator-pulsar/pull/2399#issuecomment-416320663 merged as 93b6d20 in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2387: Fixed race condition during expansion of concurrent open hash maps
sijie commented on issue #2387: Fixed race condition during expansion of concurrent open hash maps URL: https://github.com/apache/incubator-pulsar/pull/2387#issuecomment-416320271 cherry-pick as 6146ca9 to branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2377: Increased default brokerShutdownTimeout to 60 seconds
sijie commented on issue #2377: Increased default brokerShutdownTimeout to 60 seconds URL: https://github.com/apache/incubator-pulsar/pull/2377#issuecomment-416319699 merged as 7416fc0 in branch-2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch branch-2.1 updated: Fixed race condition during expansion of concurrent open hash maps (#2387)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 6146ca9 Fixed race condition during expansion of concurrent open hash maps (#2387) 6146ca9 is described below commit 6146ca9cc4c307bf4b19915ca6f13280efd7d428 Author: Matteo Merli AuthorDate: Fri Aug 17 08:47:30 2018 -0700 Fixed race condition during expansion of concurrent open hash maps (#2387) ### Motivation Porting same fix as https://github.com/apache/bookkeeper/pull/1607 to correct issue reported on https://github.com/apache/bookkeeper/issues/1606. There is a race condition in the concurrent open hash maps implementation. The race happens when the maps gets re-hashed after the expansion and the new arrays are substituting the old ones. The race itself is that a thread doing a `get()` on the map is first checking the current capacity of the map, uses that to get the bucket and then tries to do optimistic read of the value in that bucket. This assumes `capacity` update is visible only after the `values` array is already swapped, but that is not always the case in current code. ### Changes * Use `volatile` qualifier for `capacity` and `values` arrays to ensure ordering of memory read is respected by compiler * In rehashing, update `capacity` after `values` where it was not already the case --- .../util/collections/ConcurrentLongHashMap.java| 8 +-- .../util/collections/ConcurrentLongPairSet.java| 15 +++--- .../util/collections/ConcurrentOpenHashMap.java| 6 +-- .../util/collections/ConcurrentOpenHashSet.java| 6 +-- .../collections/ConcurrentLongHashMapTest.java | 57 -- 5 files changed, 72 insertions(+), 20 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java index fcb0c10..60c24c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java @@ -30,7 +30,7 @@ import com.google.common.collect.Lists; /** * Map from long to an Object. - * + * * Provides similar methods as a ConcurrentMap with 2 differences: * * No boxing/unboxing from long -> Long @@ -187,10 +187,10 @@ public class ConcurrentLongHashMap { // A section is a portion of the hash map that is covered by a single @SuppressWarnings("serial") private static final class Section extends StampedLock { -private long[] keys; -private V[] values; +private volatile long[] keys; +private volatile V[] values; -private int capacity; +private volatile int capacity; private volatile int size; private int usedBuckets; private int resizeThreshold; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java index 74d4314..4634b40 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.StampedLock; -import java.util.function.Predicate; /** * Concurrent hash set where values are composed of pairs of longs. @@ -163,11 +162,11 @@ public class ConcurrentLongPairSet { /** * Removes all of the elements of this collection that satisfy the given predicate. - * + * * @param filter *a predicate which returns {@code true} for elements to be removed * @return {@code true} if any elements were removed - * + * * @return number of removed values */ public int removeIf(LongPairPredicate filter) { @@ -209,9 +208,9 @@ public class ConcurrentLongPairSet { @SuppressWarnings("serial") private static final class Section extends StampedLock { // Keys and values are stored interleaved in the table array -private long[] table; +private volatile long[] table; -private int capacity; +private volatile int capacity; private volatile int size; private int usedBuckets; private int resizeThreshold; @@ -449,9 +448,11 @@ public class ConcurrentLongPairSet { } } -capacity = newCapacity; table = newTable;
[incubator-pulsar] branch branch-2.1 updated: Kinesis-sink consider topic-name as partition-key if record key empty (#2372)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 703305b Kinesis-sink consider topic-name as partition-key if record key empty (#2372) 703305b is described below commit 703305b5426856bab7bab30a41e4f242e7782dc7 Author: Rajan Dhabalia AuthorDate: Tue Aug 14 15:41:45 2018 -0700 Kinesis-sink consider topic-name as partition-key if record key empty (#2372) --- .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 67de21a..cdfadde 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -114,7 +114,7 @@ public class KinesisSink implements Sink { record.getRecordSequence()); throw new IllegalStateException("kinesis queue has publish failure"); } -String partitionedKey = record.getKey().orElse(defaultPartitionedKey); +String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey)); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256
[incubator-pulsar] branch branch-2.1 updated: [compaction] make topic compaction works with partitioned topic (#2367)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new f97a8f7 [compaction] make topic compaction works with partitioned topic (#2367) f97a8f7 is described below commit f97a8f7d136ddc609d01ce54e20f45e9949b04df Author: Sijie Guo AuthorDate: Tue Aug 14 11:02:24 2018 -0700 [compaction] make topic compaction works with partitioned topic (#2367) * [compaction] make topic compaction works with partitioned topic ### Motivation Topic compaction doesn't work with partitioned topic. ### Changes - make `RawReaderImpl` and `ReaderImpl` return message with partition idx - make broker service `Consumer` deliver MessageIdData with partition idx - add an integration test to ensure compaction work with partitioned topic --- .../java/org/apache/pulsar/broker/service/Consumer.java | 9 +++-- .../org/apache/pulsar/client/impl/RawReaderImpl.java | 16 .../org/apache/pulsar/compaction/TwoPhaseCompactor.java | 8 .../PersistentDispatcherFailoverConsumerTest.java| 2 +- .../java/org/apache/pulsar/client/impl/ReaderImpl.java | 4 +++- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c3befa5..4c54018 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -73,6 +73,7 @@ public class Consumer { private final String appId; private AuthenticationDataSource authenticationData; private final String topicName; +private final int partitionIdx; private final InitialPosition subscriptionInitialPosition; private final long consumerId; @@ -119,6 +120,7 @@ public class Consumer { this.subscription = subscription; this.subType = subType; this.topicName = topicName; +this.partitionIdx = TopicName.getPartitionIndex(topicName); this.consumerId = consumerId; this.priorityLevel = priorityLevel; this.readCompacted = readCompacted; @@ -239,8 +241,11 @@ public class Consumer { Entry entry = entries.get(i); PositionImpl pos = (PositionImpl) entry.getPosition(); MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder(); -MessageIdData messageId = messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId()) -.build(); +MessageIdData messageId = messageIdBuilder +.setLedgerId(pos.getLedgerId()) +.setEntryId(pos.getEntryId()) +.setPartition(partitionIdx) +.build(); ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index e768c3e..ae1a4db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader { RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, CompletableFuture> consumerFuture) { -super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1, -consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES); +super(client, +conf.getSingleTopic(), +conf, +client.externalExecutorProvider().getExecutor(), +TopicName.getPartitionIndex(conf.getSingleTopic()), +consumerFuture, +SubscriptionMode.Durable, +MessageId.earliest, +Schema.BYTES); incomingRawMessages = new GrowableArrayBlockingQueue<>();
[GitHub] sijie commented on issue #2311: [cpp] receiver queue size config acorss partitions in multi-topics-consumer
sijie commented on issue #2311: [cpp] receiver queue size config acorss partitions in multi-topics-consumer URL: https://github.com/apache/incubator-pulsar/pull/2311#issuecomment-416318241 merged as bcb3d3f in master This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] 02/02: [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git commit bcb3d3f6d15995b74c51d0507fece569361928da Author: Jia Zhai AuthorDate: Tue Aug 14 06:05:59 2018 +0800 [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311) * catch up receiver queue size support in multi topics consumer * add python config --- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 5 - pulsar-client-cpp/python/pulsar/__init__.py| 13 pulsar-client-cpp/python/src/config.cc | 4 .../client/impl/MultiTopicsConsumerImpl.java | 23 +- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 7be197c..6750273 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -160,9 +160,12 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs()); config.setMessageListener( boost::bind(::messageReceived, shared_from_this(), _1, _2)); -config.setReceiverQueueSize(conf_.getReceiverQueueSize()); int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1; +// Apply total limit of receiver queue size across partitions +config.setReceiverQueueSize( +std::min(conf_.getReceiverQueueSize(), + (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions))); Lock lock(mutex_); topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions)); diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 434fb07..f3b560b 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -315,6 +315,7 @@ class Client: send_timeout_millis=3, compression_type=CompressionType.NONE, max_pending_messages=1000, +max_pending_messages_across_partitions=5, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, @@ -352,6 +353,9 @@ class Client: * `max_pending_messages`: Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. +* `max_pending_messages_across_partitions`: + Set the max size of the queue holding the messages pending to receive + an acknowledgment across partitions from the broker. * `block_if_queue_full`: Set whether `send_async` operations should block when the outgoing message queue is full. * `message_routing_mode`: @@ -364,6 +368,7 @@ class Client: _check_type(int, send_timeout_millis, 'send_timeout_millis') _check_type(CompressionType, compression_type, 'compression_type') _check_type(int, max_pending_messages, 'max_pending_messages') +_check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') _check_type(bool, block_if_queue_full, 'block_if_queue_full') _check_type(bool, batching_enabled, 'batching_enabled') _check_type(int, batching_max_messages, 'batching_max_messages') @@ -374,6 +379,7 @@ class Client: conf.send_timeout_millis(send_timeout_millis) conf.compression_type(compression_type) conf.max_pending_messages(max_pending_messages) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) conf.block_if_queue_full(block_if_queue_full) conf.batching_enabled(batching_enabled) conf.batching_max_messages(batching_max_messages) @@ -392,6 +398,7 @@ class Client: consumer_type=ConsumerType.Exclusive, message_listener=None, receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=5, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=3, @@ -434,6 +441,9 @@ class Client: should not be interrupted when the consumer queue size is zero. The default value is 1000 messages and should work well for most use cases. +* `max_total_receiver_queue_size_across_partitions` + Set the max total receiver queue size across partitions. + This setting will be used to reduce the receiver queue size for individual partitions *
[incubator-pulsar] branch branch-2.1 updated (b070cfd -> bcb3d3f)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a change to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git. from b070cfd [docker] Publish pulsar-all image to docker hub (#2361) new 068db59 Fix: function with multi-topic not acking on effectively-once (#2347) new bcb3d3f [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../api/PartitionedProducerConsumerTest.java | 9 ++--- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 10 +++--- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 5 ++- pulsar-client-cpp/python/pulsar/__init__.py| 13 +++ pulsar-client-cpp/python/src/config.cc | 4 +++ .../client/impl/MultiTopicsConsumerImpl.java | 40 ++ .../pulsar/functions/source/PulsarSource.java | 24 +++-- 7 files changed, 67 insertions(+), 38 deletions(-)
[incubator-pulsar] 01/02: Fix: function with multi-topic not acking on effectively-once (#2347)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git commit 068db598adb529d1ae441084c280f0e776d2cbbc Author: Rajan Dhabalia AuthorDate: Mon Aug 13 14:19:46 2018 -0700 Fix: function with multi-topic not acking on effectively-once (#2347) ### Motivation `MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not acking message and failing `EFFECTIVELY_ONCE` behavior. ### Modifications Function should ack message for a specific topic consumer if `inputTopicConsumer` is multi-topic consumer. ### Result Function should able to ack messages for multi-topic consumer when processing-guarantee is `EFFECTIVELY_ONCE` --- .../api/PartitionedProducerConsumerTest.java | 9 ++-- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 10 + .../client/impl/MultiTopicsConsumerImpl.java | 17 +-- .../pulsar/functions/source/PulsarSource.java | 24 -- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index a599532..ae7757d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { } try { -producer = pulsarClient.newProducer().topic(topicName.toString()) -.enableBatching(false) -.messageRoutingMode(MessageRoutingMode.SinglePartition) -.create(); +producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe(); producer.send("message1".getBytes()); producer.send("message2".getBytes()); /* Message msg1 = */ consumer.receive(); Message msg2 = consumer.receive(); consumer.acknowledgeCumulative(msg2); -Assert.fail("should fail since ack cumulative is not supported for partitioned topic"); -} catch (PulsarClientException e) { -Assert.assertTrue(e instanceof PulsarClientException.NotSupportedException); } finally { producer.close(); consumer.unsubscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5398bc9..2001981 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -356,11 +357,11 @@ public class PulsarSinkE2ETest { retryStrategically((test) -> { try { SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); -return subStats.unackedMessages == 0; +return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; } catch (PulsarAdminException e) { return false; } -}, 5, 500); +}, 5, 200); FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); functionRuntimeManager.updateRates(); @@ -400,11 +401,12 @@ public class PulsarSinkE2ETest { functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); functionDetailsBuilder.setParallelism(1); functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE); // set source spec // source spec classname should be empty so that the default pulsar source will be used
[GitHub] david-streamlio commented on a change in pull request #2409: Added HDFS Sink
david-streamlio commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r213064055 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +import org.apache.commons.lang.StringUtils; + +/** + * Configuration object for all HDFS components. + */ +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + +private static final long serialVersionUID = 1L; + +/** + * A file or comma separated list of files which contains the Hadoop file system configuration. + * Without this, Hadoop will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file + * or will revert to a default configuration. + */ +private String hdfsConfigResources; + +/** + * The HDFS directory from which files should be read from or written to. + */ +private String directory; + +/** + * The character encoding for the files, e.g. UTF-8, ASCII, etc. + */ +private String encoding; + +/** + * The compression codec used to compress/de-compress the files on HDFS. + */ +private Compression compression; + +/** + * The Kerberos user principal account to use for authentication. + */ +private String kerberosUserPrincipal; + +/** + * The full pathname to the Kerberos keytab file to use for authentication. + */ +private String keytab; + +public void validate() { +if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) { Review comment: I will change the comment, since we cannot assume that this Sink will be executing on a node that has the Hadoop client configuration files available. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2409: Added HDFS Sink
srkukarni commented on a change in pull request #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#discussion_r213060424 ## File path: pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +import org.apache.commons.lang.StringUtils; + +/** + * Configuration object for all HDFS components. + */ +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + +private static final long serialVersionUID = 1L; + +/** + * A file or comma separated list of files which contains the Hadoop file system configuration. + * Without this, Hadoop will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file + * or will revert to a default configuration. + */ +private String hdfsConfigResources; + +/** + * The HDFS directory from which files should be read from or written to. + */ +private String directory; + +/** + * The character encoding for the files, e.g. UTF-8, ASCII, etc. + */ +private String encoding; + +/** + * The compression codec used to compress/de-compress the files on HDFS. + */ +private Compression compression; + +/** + * The Kerberos user principal account to use for authentication. + */ +private String kerberosUserPrincipal; + +/** + * The full pathname to the Kerberos keytab file to use for authentication. + */ +private String keytab; + +public void validate() { +if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) { Review comment: The comments for hdfsConfigResources indicate that if this is unset we look into certain places. Thus it looks like this check shouldnt be there? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2450: Added Reader.HasNext in Go client
merlimat commented on issue #2450: Added Reader.HasNext in Go client URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416311755 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2449: Support compaction options in Go client
merlimat commented on issue #2449: Support compaction options in Go client URL: https://github.com/apache/incubator-pulsar/pull/2449#issuecomment-416311523 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2453: Fixed cpp multi-topic consumer when topics are not partitioned
merlimat opened a new pull request #2453: Fixed cpp multi-topic consumer when topics are not partitioned URL: https://github.com/apache/incubator-pulsar/pull/2453 ### Motivation The multi-topics consumer in C++ client is not correctly handling the case where topics are not partitioned. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message
sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053077 ## File path: pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java ## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import java.util.List; + +/** + * An interface represents a message with schema. + */ +public interface GenericRecord { + +/** + * Returns the list of fields associated with the record. + * + * @return the list of fields associated with the record. + */ +List getFields(); Review comment: I didn't do getAsString, getAsLong at this PR, because it is better to have primitive schemas (e.g. int, long, double, float, string, boolean) first. this generic interfaces allows unblocking JDBC connector first. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message
sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053108 ## File path: pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java ## @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A field in a record, consisting of a field name, index, and + * {@link org.apache.pulsar.client.api.Schema} for the field value. + */ +@Data +@EqualsAndHashCode +@ToString +public class Field { + +private final String name; Review comment: will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message
sijie commented on a change in pull request #2452: [Schema] Provide a generic record interface for representing a typed message URL: https://github.com/apache/incubator-pulsar/pull/2452#discussion_r213053077 ## File path: pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java ## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import java.util.List; + +/** + * An interface represents a message with schema. + */ +public interface GenericRecord { + +/** + * Returns the list of fields associated with the record. + * + * @return the list of fields associated with the record. + */ +List getFields(); Review comment: I didn't do getAsString, getAsLong at this PR, because it is better to have primitive schemas (e.g. int, long, double, float, string, boolean). this generic interfaces allows unblocking JDBC connector first. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2409: Added HDFS Sink
sijie commented on issue #2409: Added HDFS Sink URL: https://github.com/apache/incubator-pulsar/pull/2409#issuecomment-416303592 @srkukarni @jerrypeng can you guys also review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2447: Added producer/consumer properties in Go client
merlimat commented on issue #2447: Added producer/consumer properties in Go client URL: https://github.com/apache/incubator-pulsar/pull/2447#issuecomment-416292701 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support
merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213042192 ## File path: pulsar-client-cpp/python/pulsar_test.py ## @@ -650,6 +650,110 @@ def _v2_topics(self, url): client.close() +def test_topics_consumer(self): +client = Client(self.serviceUrl) +topic1 = 'persistent://sample/standalone/ns/my-python-topics-consumer-1' +topic2 = 'persistent://sample/standalone/ns/my-python-topics-consumer-2' +topic3 = 'persistent://sample/standalone/ns/my-python-topics-consumer-3' +topics = [topic1, topic2, topic3] + +url1 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions' +url2 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions' +url3 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions' + +doHttpPut(url1, '2') +doHttpPut(url2, '3') +doHttpPut(url3, '4') Review comment: As I mentioned in #2448, I think the multi-topic consumer in C++ is currently not working when there are no partitions in the topic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support
merlimat commented on a change in pull request #2451: Issue 2312: add python client multi-topics consumer support URL: https://github.com/apache/incubator-pulsar/pull/2451#discussion_r213041782 ## File path: pulsar-client-cpp/python/pulsar/__init__.py ## @@ -503,6 +503,211 @@ def my_listener(consumer, message): self._consumers.append(c) return c +def subscribe_topics(self, topics, subscription_name, Review comment: @jiazhai I'd prefer to keep a single `subscribe()` method. In python we can check the type of the argument `topic` and detect whether is a string, a list or a regex object. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2450: Added Reader.HasNext in Go client
merlimat commented on issue #2450: Added Reader.HasNext in Go client URL: https://github.com/apache/incubator-pulsar/pull/2450#issuecomment-416289232 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services