[GitHub] yush1ga commented on issue #1352: Delete inactive subscriptions automatically
yush1ga commented on issue #1352: Delete inactive subscriptions automatically URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-379975158 @merlimat @nkurihar I added `lastActive` to `ManagedCursor`. We can detect the last time that consumers connect or disconnect in the subscription and it is used for deleting inactive subscription. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1529: Fix delete topic check and provide better error message.
merlimat closed pull request #1529: Fix delete topic check and provide better error message. URL: https://github.com/apache/incubator-pulsar/pull/1529 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 03a435d9c5..863272d5c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -470,8 +470,10 @@ protected void internalDeleteTopic(boolean authoritative) { // v2 topics have a global name so check if the topic is replicated. if (topic.isReplicated()) { // Delete is disallowed on global topic -log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), topicName); -throw new RestException(Status.FORBIDDEN, "Delete forbidden on global namespace"); +final List clusters = topic.getReplicators().keys(); +log.error("[{}] Delete forbidden topic {} is replicated on clusters {}", +clientAppId(), topicName, clusters); +throw new RestException(Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + clusters); } try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2d63fb6d11..c34ec84ba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1507,7 +1507,7 @@ public boolean isEncryptionRequired() { @Override public boolean isReplicated() { -return replicators.size() > 1; +return !replicators.isEmpty(); } public CompletableFuture terminate() { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix delete topic check and provide better error message. (#1529)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new dc9a9dc Fix delete topic check and provide better error message. (#1529) dc9a9dc is described below commit dc9a9dc808dcea35d5570ad5c6d8d22183ce0a7d Author: cckelloggAuthorDate: Mon Apr 9 19:23:32 2018 -0700 Fix delete topic check and provide better error message. (#1529) * Fix delete topic check and provide better error message. * Update error message. * Update log to match exception. --- .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 -- .../apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 03a435d..863272d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -470,8 +470,10 @@ public class PersistentTopicsBase extends AdminResource { // v2 topics have a global name so check if the topic is replicated. if (topic.isReplicated()) { // Delete is disallowed on global topic -log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), topicName); -throw new RestException(Status.FORBIDDEN, "Delete forbidden on global namespace"); +final List clusters = topic.getReplicators().keys(); +log.error("[{}] Delete forbidden topic {} is replicated on clusters {}", +clientAppId(), topicName, clusters); +throw new RestException(Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + clusters); } try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2d63fb6..c34ec84 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1507,7 +1507,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { @Override public boolean isReplicated() { -return replicators.size() > 1; +return !replicators.isEmpty(); } public CompletableFuture terminate() { -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] srkukarni commented on issue #1534: Implement Cassandra Sink
srkukarni commented on issue #1534: Implement Cassandra Sink URL: https://github.com/apache/incubator-pulsar/pull/1534#issuecomment-379947340 @sijie @merlimat This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1534: Implement Cassandra Sink
srkukarni opened a new pull request #1534: Implement Cassandra Sink URL: https://github.com/apache/incubator-pulsar/pull/1534 ### Motivation Using the Pulsar Connect Sink interface, implemented a simple cassandra sink ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1531: remove unnecessary proto def in FunctionsConfig
jerrypeng commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379946725 I am ok with users specifying functions via fqfn but currently FunctionConfig already has tenant, namespace, and name fields. Adding another field fqfn would be redundant and create multiple sources of truth which can easily result in bugs later on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig
sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379943562 Currently `FunctionConfig` is used for 1) stored as part of function metadata, and 2) loading yaml config 2. That means this question has two parts: 1) do we need FQFN for function metadata 2) do we want user to be able to configure FQFN in yaml file? I think it is clear that we don't want to store FQFN in as part of function metadata (which I think that is the original purpose of this change). for the second question 2), I think FQFN might be convenient on command line, but not very sure about yaml file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic
zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379942086 @merlimat , Thanks. From the test logs, ownership acquire seems not take to much time, but loadNamespaceTopics takes some time. The linked list is a concurrent link, seems work-able in test, maybe there is some race condition that not considered. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #1533: Refactor JavaInstanceRunnable
sijie opened a new pull request #1533: Refactor JavaInstanceRunnable URL: https://github.com/apache/incubator-pulsar/pull/1533 *Motivation* EffectivelyOnce processing made `JavaInstanceRunnable` very complicated. There are tons of braching logic. It makes code hard to maintain. *Solution* Abstract the processing guarantee related logic into a `MessageProcessor` interface. Implement `at-most-once`, `at-least-once` and `effectively-once` processors. *Result* After this change, `JavaInstanceRunnable` is much cleaner. `At-Least-Once` and `At-Most-Once` logic are much simpler and easier to debug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic
zhaijack commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379942086 @merlimat , Thanks. From the test logs, ownership acquire seems not take to much time, but loadNamespaceTopics takes some time. The linked list is a concurrent link, seems work-able in test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1526: make connection timeout config-able for consumer and producer
zhaijack commented on issue #1526: make connection timeout config-able for consumer and producer URL: https://github.com/apache/incubator-pulsar/pull/1526#issuecomment-379941198 Thanks @merlimat , make a mistake here. will close this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack closed pull request #1526: make connection timeout config-able for consumer and producer
zhaijack closed pull request #1526: make connection timeout config-able for consumer and producer URL: https://github.com/apache/incubator-pulsar/pull/1526 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a446dd4567..ccdfaf78fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -184,7 +184,9 @@ } this.connectionHandler = new ConnectionHandler(this, -new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS), +new Backoff(100, TimeUnit.MILLISECONDS, +client.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS, +0, TimeUnit.MILLISECONDS), this); grabCnx(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 92a512de2b..901a465a6e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -181,7 +181,9 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration } this.connectionHandler = new ConnectionHandler(this, -new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS), +new Backoff(100, TimeUnit.MILLISECONDS, +client.getConfiguration().getOperationTimeoutMs(), TimeUnit.MILLISECONDS, +Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS), this); grabCnx(); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview
sijie commented on issue #1503: Pulsar Functions triggering overview URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379939993 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig
srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379933269 In that case, I'm now questioning the viability of having this feature. Having tenant/namespace/name AND fqn clearly pollutes/increases the surface area unnecessarily imo. @jerrypeng @sijie do you guys have thoughts on this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Pulsar Connect (#1520)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 027b424 Pulsar Connect (#1520) 027b424 is described below commit 027b424fd914db306bc3a9223ca2b36030421ce8 Author: Sanjeev KulkarniAuthorDate: Mon Apr 9 17:08:06 2018 -0700 Pulsar Connect (#1520) * Added Pulsar Connect interfaces that define connectors that push data into pulsar and take data from pulsar * Added Twitter connector * Added hbc core version mapping * Addressed comments * Fixed build * Fixed license header --- pom.xml| 3 + pulsar-connect/core/pom.xml| 33 .../org/apache/pulsar/connect/core/PushSource.java | 52 +++ .../java/org/apache/pulsar/connect/core/Sink.java | 49 ++ pulsar-connect/pom.xml | 39 + pulsar-connect/twitter/pom.xml | 61 .../pulsar/connect/twitter/TwitterFireHose.java| 167 + .../connect/twitter/TwitterFireHoseConfig.java | 63 8 files changed, 467 insertions(+) diff --git a/pom.xml b/pom.xml index 30b3d5e..2ea3f52 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,8 @@ flexible messaging model and an intuitive client API. pulsar-log4j2-appender pulsar-functions + +pulsar-connect @@ -139,6 +141,7 @@ flexible messaging model and an intuitive client API. 2.8.2 0.8.3 2.1.1 +2.2.0 3.4.0 diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml new file mode 100644 index 000..dea6d62 --- /dev/null +++ b/pulsar-connect/core/pom.xml @@ -0,0 +1,33 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.pulsar +pulsar-connect +2.0.0-incubating-SNAPSHOT + + + pulsar-connect-core + Pulsar Connect :: Connect + + diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java new file mode 100644 index 000..65b006b --- /dev/null +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.connect.core; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Pulsar's Push Source interface. PushSource read data from + * external sources(database changes, twitter firehose, etc) + * and publish to a Pulsar topic. The reason its called Push is + * because PushSources get passed a consumption Function that they + * invoke whenever they have data to be published to Pulsar. + * The lifcycle of a PushSource is to open it passing any config needed + * by it to initialize(like open network connection, authenticate, etc). + * A consumer Function is then to it which is invoked by the source whenever + * there is data to be published. Once all data has been read, one can use close + * at the end of the session to do any cleanup + */ +public interface PushSource extends AutoCloseable { +/** + * Open connector with configuration + * + * @param config initialization config + * @throws Exception IO type exceptions when opening a connector + */ +void open(final Map config) throws Exception; + +/** + * Attach a consumer function to this Source. This is invoked by the implementation + * to pass messages whenever there is data to be pushed to Pulsar. + * @param consumer + */ +void setConsumer(Function consumer); +} \ No newline at end of file diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
[GitHub] merlimat closed pull request #1520: Pulsar Connect
merlimat closed pull request #1520: Pulsar Connect URL: https://github.com/apache/incubator-pulsar/pull/1520 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index 30b3d5e5e2..2ea3f5235c 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,8 @@ flexible messaging model and an intuitive client API. pulsar-log4j2-appender pulsar-functions + +pulsar-connect @@ -139,6 +141,7 @@ flexible messaging model and an intuitive client API. 2.8.2 0.8.3 2.1.1 +2.2.0 3.4.0 diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml new file mode 100644 index 00..dea6d626cc --- /dev/null +++ b/pulsar-connect/core/pom.xml @@ -0,0 +1,33 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.pulsar +pulsar-connect +2.0.0-incubating-SNAPSHOT + + + pulsar-connect-core + Pulsar Connect :: Connect + + diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java new file mode 100644 index 00..65b006bf6f --- /dev/null +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.connect.core; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Pulsar's Push Source interface. PushSource read data from + * external sources(database changes, twitter firehose, etc) + * and publish to a Pulsar topic. The reason its called Push is + * because PushSources get passed a consumption Function that they + * invoke whenever they have data to be published to Pulsar. + * The lifcycle of a PushSource is to open it passing any config needed + * by it to initialize(like open network connection, authenticate, etc). + * A consumer Function is then to it which is invoked by the source whenever + * there is data to be published. Once all data has been read, one can use close + * at the end of the session to do any cleanup + */ +public interface PushSource extends AutoCloseable { +/** + * Open connector with configuration + * + * @param config initialization config + * @throws Exception IO type exceptions when opening a connector + */ +void open(final Mapconfig) throws Exception; + +/** + * Attach a consumer function to this Source. This is invoked by the implementation + * to pass messages whenever there is data to be pushed to Pulsar. + * @param consumer + */ +void setConsumer(Function consumer); +} \ No newline at end of file diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java new file mode 100644 index 00..e22eb0f20b --- /dev/null +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for
[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig
lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379930880 @srkukarni I feel like FQFN should be settable either via the command line *and* via YAML or neither. Inconsistent interfaces will likely sow a lot of unnecessary confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig
srkukarni commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379929218 @lucperkins My feeling is that this is should be a command line helper rather than something in the fqn. Thus I would vote it to be removed from protobuf, even if that means that in the config file, we need to specify tenant/namespace/name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag
lucperkins commented on a change in pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180263273 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -117,9 +115,31 @@ void processArguments() throws Exception {} * Function level command */ @Getter -abstract class FunctionCommand extends NamespaceCommand { -@Parameter(names = "--name", description = "The function's name", required = true) +abstract class FunctionCommand extends BaseCommand { +@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") +protected String fqfn; + +@Parameter(names = "--tenant", description = "The function's tenant") +protected String tenant; + +@Parameter(names = "--namespace", description = "The function's namespace") +protected String namespace; + +@Parameter(names = "--name", description = "The function's name") protected String functionName; + +@Override +void processArguments() throws Exception { Review comment: Okay, never mind, I understand what you mean now. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig
lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379927094 @jerrypeng As far as I can tell this *is* necessary for users to be able to specify FQFN in a function's YAML config, like so: ```yaml jar: my-jar.jar className: org.example.functions.MyFunction inputs: - persistent://sample/standalone/ns1/in output: persistent://sample/standalone/ns1/out fqfn: sample/ns1/my-func ``` At the moment, the `FunctionConfig` class that's generated from that Protobuf file powers the whole YAML config system. I'm not sure if that's optimal but for now it's the system that's in place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig
lucperkins commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379927094 @jerrypeng As far as I can tell this *is* necessary for users to be able to specify FQFN in a function's YAML config, like so: ```yaml jar: my-jar.jar className: org.example.functions.MyFunction inputs: - persistent://sample/standalone/ns1/in output: persistent://sample/standalone/ns1/out fqfn: sample/ns1/my-func ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig
sijie commented on issue #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531#issuecomment-379919691 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1530: remove unused proto declaration
sijie closed pull request #1530: remove unused proto declaration URL: https://github.com/apache/incubator-pulsar/pull/1530 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 542d847c4d..ab4a726432 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -69,11 +69,6 @@ message FunctionMetaData { uint64 createTime = 4; } -message Snapshot { -repeated FunctionMetaData functionMetaDataList = 1; -bytes lastAppliedMessageId = 2; -} - message Instance { FunctionMetaData functionMetaData = 1; int32 instanceId = 2; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: remove unused proto declaration (#1530)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 99bd812 remove unused proto declaration (#1530) 99bd812 is described below commit 99bd812e000510a86fd39fca396fc058fc27f673 Author: Boyang Jerry PengAuthorDate: Mon Apr 9 15:58:15 2018 -0700 remove unused proto declaration (#1530) --- pulsar-functions/proto/src/main/proto/Function.proto | 5 - 1 file changed, 5 deletions(-) diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 542d847..ab4a726 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -69,11 +69,6 @@ message FunctionMetaData { uint64 createTime = 4; } -message Snapshot { -repeated FunctionMetaData functionMetaDataList = 1; -bytes lastAppliedMessageId = 2; -} - message Instance { FunctionMetaData functionMetaData = 1; int32 instanceId = 2; -- To stop receiving notification emails like this one, please contact si...@apache.org.
[GitHub] sijie commented on issue #1532: Support JSON in schema registry
sijie commented on issue #1532: Support JSON in schema registry URL: https://github.com/apache/incubator-pulsar/issues/1532#issuecomment-379918852 /cc @mgodave This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new issue #1532: Support JSON in schema registry
sijie opened a new issue #1532: Support JSON in schema registry URL: https://github.com/apache/incubator-pulsar/issues/1532 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag
lucperkins commented on a change in pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180254078 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -117,9 +115,31 @@ void processArguments() throws Exception {} * Function level command */ @Getter -abstract class FunctionCommand extends NamespaceCommand { -@Parameter(names = "--name", description = "The function's name", required = true) +abstract class FunctionCommand extends BaseCommand { +@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") +protected String fqfn; + +@Parameter(names = "--tenant", description = "The function's tenant") +protected String tenant; + +@Parameter(names = "--namespace", description = "The function's namespace") +protected String namespace; + +@Parameter(names = "--name", description = "The function's name") protected String functionName; + +@Override +void processArguments() throws Exception { Review comment: I agree but in this case `FunctionCommand`s, `NamespaceCommand`s, and `FunctionConfigCommand`s really do require separate logic. I'm not sure I can see a way around that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1528: Fix for FQFN flag
lucperkins commented on a change in pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180251671 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -117,9 +115,31 @@ void processArguments() throws Exception {} * Function level command */ @Getter -abstract class FunctionCommand extends NamespaceCommand { -@Parameter(names = "--name", description = "The function's name", required = true) +abstract class FunctionCommand extends BaseCommand { +@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") +protected String fqfn; + +@Parameter(names = "--tenant", description = "The function's tenant") +protected String tenant; + +@Parameter(names = "--namespace", description = "The function's namespace") +protected String namespace; + +@Parameter(names = "--name", description = "The function's name") protected String functionName; + +@Override +void processArguments() throws Exception { +if (null != fqfn) { +String[] fqfnArray = fqfn.split("/"); +if (fqfnArray.length != 3) { +throw new IllegalArgumentException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name"); +} +tenant = fqfnArray[0]; Review comment: Well, in this case you can *either* set a FQFN *or* tenant + namespace + name. So I think that I'll change this to make sure users don't attempt to apply both systems. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1520: Pulsar Connect
srkukarni commented on issue #1520: Pulsar Connect URL: https://github.com/apache/incubator-pulsar/pull/1520#issuecomment-379910681 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview
sijie commented on issue #1503: Pulsar Functions triggering overview URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379903856 @lucperkins can you merge the latest master? this PR has conflicts with master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1528: Fix for FQFN flag
sijie commented on a change in pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180238543 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -117,9 +115,31 @@ void processArguments() throws Exception {} * Function level command */ @Getter -abstract class FunctionCommand extends NamespaceCommand { -@Parameter(names = "--name", description = "The function's name", required = true) +abstract class FunctionCommand extends BaseCommand { +@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") +protected String fqfn; + +@Parameter(names = "--tenant", description = "The function's tenant") +protected String tenant; + +@Parameter(names = "--namespace", description = "The function's namespace") +protected String namespace; + +@Parameter(names = "--name", description = "The function's name") protected String functionName; + +@Override +void processArguments() throws Exception { Review comment: nit: it is good to call super.processArguments() in general. just in case people modified the logic in BaseCommand in future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1528: Fix for FQFN flag
sijie commented on a change in pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528#discussion_r180239050 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -117,9 +115,31 @@ void processArguments() throws Exception {} * Function level command */ @Getter -abstract class FunctionCommand extends NamespaceCommand { -@Parameter(names = "--name", description = "The function's name", required = true) +abstract class FunctionCommand extends BaseCommand { +@Parameter(names = "--fqfn", description = "The Fully Qualified Function Name (FQFN) for the function") +protected String fqfn; + +@Parameter(names = "--tenant", description = "The function's tenant") +protected String tenant; + +@Parameter(names = "--namespace", description = "The function's namespace") +protected String namespace; + +@Parameter(names = "--name", description = "The function's name") protected String functionName; + +@Override +void processArguments() throws Exception { +if (null != fqfn) { +String[] fqfnArray = fqfn.split("/"); +if (fqfnArray.length != 3) { +throw new IllegalArgumentException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name"); +} +tenant = fqfnArray[0]; Review comment: better to throw exception if tenant/namespace/functions are inconsistent. e.g. `-tenant t1 -fqfn t2/namespace/name` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1529: Fix delete topic check and provide better error message.
sijie commented on a change in pull request #1529: Fix delete topic check and provide better error message. URL: https://github.com/apache/incubator-pulsar/pull/1529#discussion_r180237264 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java ## @@ -471,7 +471,8 @@ protected void internalDeleteTopic(boolean authoritative) { if (topic.isReplicated()) { // Delete is disallowed on global topic log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), topicName); -throw new RestException(Status.FORBIDDEN, "Delete forbidden on global namespace"); +throw new RestException(Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + Review comment: nit: it might be good to keep the error message consistent between exception and the logging statement above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #1531: remove unnecessary proto def in FunctionsConfig
jerrypeng opened a new pull request #1531: remove unnecessary proto def in FunctionsConfig URL: https://github.com/apache/incubator-pulsar/pull/1531 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #1530: remove unused proto declaration
jerrypeng opened a new pull request #1530: remove unused proto declaration URL: https://github.com/apache/incubator-pulsar/pull/1530 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1519: Avoid contention in ManagedCursorImpl generated by locking on pendingMarkDeleteOps
merlimat closed pull request #1519: Avoid contention in ManagedCursorImpl generated by locking on pendingMarkDeleteOps URL: https://github.com/apache/incubator-pulsar/pull/1519 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 3ee7703994..05de7fea05 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -43,12 +43,12 @@ pulsar-common ${project.version} - + com.google.guava guava - + org.apache.zookeeper zookeeper @@ -78,6 +78,11 @@ slf4j-api + + org.jctools + jctools-core + + org.mockito mockito-core @@ -107,7 +112,7 @@ - + protobuf diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 462428e1c2..443a873ca7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -38,7 +38,6 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.protobuf.InvalidProtocolBufferException; -import java.util.ArrayDeque; import java.util.Collections; import java.util.List; import java.util.Map; @@ -82,6 +81,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.commons.lang3.tuple.Pair; +import org.jctools.queues.MpmcArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +148,8 @@ public MarkDeleteEntry(PositionImpl newPosition, Mapproperties, } } -private final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); +private final MpmcArrayQueue pendingMarkDeleteOps = new MpmcArrayQueue<>(16); + private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); @SuppressWarnings("unused") @@ -758,14 +759,12 @@ protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCu log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name); -synchronized (pendingMarkDeleteOps) { -if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { -log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", -ledger.getName(), position, name); -resetCursorCallback.resetFailed( -new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), -position); -} +if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { +log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", ledger.getName(), +position, name); +resetCursorCallback.resetFailed( +new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), +position); } final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback; @@ -805,24 +804,20 @@ public void operationComplete() { } finally { lock.writeLock().unlock(); } -synchronized (pendingMarkDeleteOps) { -pendingMarkDeleteOps.clear(); -if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { -log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", -ledger.getName(), newPosition, name); -} + +pendingMarkDeleteOps.drain(entry -> entry.callback.markDeleteComplete(entry.ctx)); +if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { +log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", +ledger.getName(), newPosition, name); } callback.resetComplete(newPosition); - } @Override public void operationFailed(ManagedLedgerException exception) { -synchronized
[GitHub] cckellogg opened a new pull request #1529: Fix delete topic check and provide better error message.
cckellogg opened a new pull request #1529: Fix delete topic check and provide better error message. URL: https://github.com/apache/incubator-pulsar/pull/1529 Fix topic replication check and provide a better error message for delete topic. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli)
sijie commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) URL: https://github.com/apache/incubator-pulsar/pull/1059#issuecomment-379873489 @merlimat can we move forward with this change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1137: Schema registry
sijie closed pull request #1137: Schema registry URL: https://github.com/apache/incubator-pulsar/pull/1137 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1137: Schema registry
sijie commented on issue #1137: Schema registry URL: https://github.com/apache/incubator-pulsar/pull/1137#issuecomment-379872308 awesome work! @mgodave This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1522: Reduce the contention on DispatcherSingleActiveConsumer by using a co…
sijie commented on issue #1522: Reduce the contention on DispatcherSingleActiveConsumer by using a co… URL: https://github.com/apache/incubator-pulsar/pull/1522#issuecomment-379871315 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1520: Pulsar Connect
sijie commented on issue #1520: Pulsar Connect URL: https://github.com/apache/incubator-pulsar/pull/1520#issuecomment-379871011 @merlimat can you review this again? @srkukarni addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Removed contention between producers on ManagedLedger addEntry (#1521)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ad90ac6 Removed contention between producers on ManagedLedger addEntry (#1521) ad90ac6 is described below commit ad90ac6463681f7358f3f331a8440dbcfcd34258 Author: Matteo MerliAuthorDate: Mon Apr 9 12:43:01 2018 -0700 Removed contention between producers on ManagedLedger addEntry (#1521) --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++ .../org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c96b35e..f6c7e3f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -479,7 +479,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override -public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { +public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } @@ -498,6 +498,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); pendingAddEntries.add(addOperation); +// Jump to specific thread to avoid contention from writers writing from different threads +executor.executeOrdered(name, safeRun(() -> { +internalAsyncAddEntry(addOperation); +})); +} + +private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created @@ -509,7 +516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) { // Deny the write request, since we haven't waited enough time since last attempt to create a new ledger pendingAddEntries.remove(addOperation); -callback.addFailed(new ManagedLedgerException("Waiting for new ledger creation to complete"), ctx); +addOperation.failed(new ManagedLedgerException("Waiting for new ledger creation to complete")); return; } @@ -521,7 +528,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), -config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx, +config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null, Collections.emptyMap()); } } else { @@ -531,7 +538,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { addOperation.setLedger(currentLedger); ++currentLedgerEntries; -currentLedgerSize += buffer.readableBytes(); +currentLedgerSize += addOperation.data.readableBytes(); if (log.isDebugEnabled()) { log.debug("[{}] Write into current ledger lh={} entries={}", name, currentLedger.getId(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 6b3a03a..0dfa338 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback { // The reading was already completed, release resources and trigger callback cursor.readOperationCompleted(); -cursor.ledger.getExecutor().execute(safeRun(() -> { + cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); }));
[GitHub] sijie closed pull request #1521: Removed contention between producers on ManagedLedger addEntry
sijie closed pull request #1521: Removed contention between producers on ManagedLedger addEntry URL: https://github.com/apache/incubator-pulsar/pull/1521 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c96b35e665..f6c7e3ffd9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -479,7 +479,7 @@ public void asyncAddEntry(final byte[] data, int offset, int length, final AddEn } @Override -public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { +public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } @@ -498,6 +498,13 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); pendingAddEntries.add(addOperation); +// Jump to specific thread to avoid contention from writers writing from different threads +executor.executeOrdered(name, safeRun(() -> { +internalAsyncAddEntry(addOperation); +})); +} + +private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created @@ -509,7 +516,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) { // Deny the write request, since we haven't waited enough time since last attempt to create a new ledger pendingAddEntries.remove(addOperation); -callback.addFailed(new ManagedLedgerException("Waiting for new ledger creation to complete"), ctx); +addOperation.failed(new ManagedLedgerException("Waiting for new ledger creation to complete")); return; } @@ -521,7 +528,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), -config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx, +config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null, Collections.emptyMap()); } } else { @@ -531,7 +538,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback addOperation.setLedger(currentLedger); ++currentLedgerEntries; -currentLedgerSize += buffer.readableBytes(); +currentLedgerSize += addOperation.data.readableBytes(); if (log.isDebugEnabled()) { log.debug("[{}] Write into current ledger lh={} entries={}", name, currentLedger.getId(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 6b3a03adbb..0dfa33844e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -139,7 +139,7 @@ void checkReadCompletion() { // The reading was already completed, release resources and trigger callback cursor.readOperationCompleted(); -cursor.ledger.getExecutor().execute(safeRun(() -> { + cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] sijie commented on issue #1503: Pulsar Functions triggering overview
sijie commented on issue #1503: Pulsar Functions triggering overview URL: https://github.com/apache/incubator-pulsar/pull/1503#issuecomment-379870448 @srkukarni can you review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1525: Compaction considers messages with empty payload as deleting the key
sijie closed pull request #1525: Compaction considers messages with empty payload as deleting the key URL: https://github.com/apache/incubator-pulsar/pull/1525 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 4e628bc5f8..9ee31acf42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -113,7 +113,8 @@ public static boolean isBatch(RawMessage msg) { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); -} else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) { +} else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id) + && singleMessagePayload.readableBytes() > 0) { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index fbad47ea30..2eaa8d0c9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.Commands; @@ -122,9 +123,9 @@ private void phaseOneLoop(RawReader reader, id, ioe); } } else { -String key = extractKey(m); -if (key != null) { -latestForKey.put(key, id); +PairkeyAndSize = extractKeyAndSize(m); +if (keyAndSize != null) { +latestForKey.put(keyAndSize.getLeft(), id); } } @@ -214,10 +215,11 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map messageToAdd = Optional.of(m); } } else { -String key = extractKey(m); -if (key == null) { // pass through messages without a key +Pair keyAndSize = extractKeyAndSize(m); +if (keyAndSize == null) { // pass through messages without a key messageToAdd = Optional.of(m); -} else if (latestForKey.get(key).equals(id)) { +} else if (latestForKey.get(keyAndSize.getLeft()).equals(id) + && keyAndSize.getRight() > 0) { messageToAdd = Optional.of(m); } else { m.close(); @@ -307,11 +309,11 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map return bkf; } -private static String extractKey(RawMessage m) { +private static Pair extractKeyAndSize(RawMessage m) { ByteBuf headersAndPayload = m.getHeadersAndPayload(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); if (msgMetadata.hasPartitionKey()) { -return msgMetadata.getPartitionKey(); +return Pair.of(msgMetadata.getPartitionKey(), headersAndPayload.readableBytes()); } else { return null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index a0f0f972e4..22e74f21a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -512,4
[incubator-pulsar] branch master updated: Compaction considers messages with empty payload as deleting the key (#1525)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 951d4d3 Compaction considers messages with empty payload as deleting the key (#1525) 951d4d3 is described below commit 951d4d3805ad0b7365aa83bfb5001c6513ddff68 Author: Ivan KellyAuthorDate: Mon Apr 9 21:39:25 2018 +0200 Compaction considers messages with empty payload as deleting the key (#1525) If the latest message with a key has an empty payload, compaction will take this to mean that the key has been deleted, so it will not be stored in the compacted topic ledger. This patch also introduces empty messages, which were not previously possible. --- .../pulsar/client/impl/RawBatchConverter.java | 3 +- .../pulsar/compaction/TwoPhaseCompactor.java | 18 +++--- .../apache/pulsar/compaction/CompactionTest.java | 68 ++ .../pulsar/client/impl/MessageBuilderImpl.java | 4 +- 4 files changed, 82 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 4e628bc..9ee31ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -113,7 +113,8 @@ public class RawBatchConverter { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); -} else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) { +} else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id) + && singleMessagePayload.readableBytes() > 0) { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index fbad47e..2eaa8d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.Commands; @@ -122,9 +123,9 @@ public class TwoPhaseCompactor extends Compactor { id, ioe); } } else { -String key = extractKey(m); -if (key != null) { -latestForKey.put(key, id); +Pair keyAndSize = extractKeyAndSize(m); +if (keyAndSize != null) { +latestForKey.put(keyAndSize.getLeft(), id); } } @@ -214,10 +215,11 @@ public class TwoPhaseCompactor extends Compactor { messageToAdd = Optional.of(m); } } else { -String key = extractKey(m); -if (key == null) { // pass through messages without a key +Pair keyAndSize = extractKeyAndSize(m); +if (keyAndSize == null) { // pass through messages without a key messageToAdd = Optional.of(m); -} else if (latestForKey.get(key).equals(id)) { +} else if (latestForKey.get(keyAndSize.getLeft()).equals(id) + && keyAndSize.getRight() > 0) { messageToAdd = Optional.of(m); } else { m.close(); @@ -307,11 +309,11 @@ public class TwoPhaseCompactor extends Compactor { return bkf; } -private static String extractKey(RawMessage m) { +private static Pair extractKeyAndSize(RawMessage m) { ByteBuf headersAndPayload = m.getHeadersAndPayload();
[GitHub] ivankelly commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true
ivankelly commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true URL: https://github.com/apache/incubator-pulsar/issues/1517#issuecomment-379854893 The fix is actually pretty trivial. ManagedLedger always opens the last ledger before creating a new one, so we can just move the initialization of lastConfirmedEntry to the open callback. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new da1a110 Use signSafeMod in RoundRobinPartitionMessageRouter (#1523) da1a110 is described below commit da1a11043c9e4591c6c5c1d8e6434ccc59b913b0 Author: Matteo MerliAuthorDate: Mon Apr 9 10:19:00 2018 -0700 Use signSafeMod in RoundRobinPartitionMessageRouter (#1523) * Use signSafeMod in RoundRobinPartitionMessageRouter * Added test with mocked clock * Fixed tests * Fixed functions test --- .../client/impl/PartitionedProducerImpl.java | 2 +- .../impl/RoundRobinPartitionMessageRouterImpl.java | 27 --- .../impl/SinglePartitionMessageRouterImpl.java | 4 +- .../org/apache/pulsar/client/util/MathUtils.java | 41 ++ .../RoundRobinPartitionMessageRouterImplTest.java | 91 ++ .../functions/instance/FunctionResultRouter.java | 9 ++- .../instance/FunctionResultRouterTest.java | 31 7 files changed, 158 insertions(+), 47 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 26f74cf..96a8254 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -168,7 +168,7 @@ public class PartitionedProducerImpl extends ProducerBase { int partition = routerPolicy.choosePartition(message, topicMetadata); checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(), -"Illegal partition index chosen by the message routing policy"); +"Illegal partition index chosen by the message routing policy: " + partition); return producers.get(partition).sendAsync(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index 87cfd1a..ee8b9a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.pulsar.client.util.MathUtils.signSafeMod; + +import java.time.Clock; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.pulsar.client.api.HashingScheme; @@ -46,35 +48,42 @@ public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase { private final boolean isBatchingEnabled; private final long maxBatchingDelayMs; -@VisibleForTesting +private final Clock clock; + +private static final Clock SYSTEM_CLOCK = Clock.systemUTC(); + public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, -int startPtnIdx) { -this(hashingScheme, startPtnIdx, false, 0); +int startPtnIdx, +boolean isBatchingEnabled, +long maxBatchingDelayMs) { +this(hashingScheme, startPtnIdx, isBatchingEnabled, maxBatchingDelayMs, SYSTEM_CLOCK); } public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, int startPtnIdx, boolean isBatchingEnabled, -long maxBatchingDelayMs) { +long maxBatchingDelayMs, +Clock clock) { super(hashingScheme); PARTITION_INDEX_UPDATER.set(this, startPtnIdx); this.startPtnIdx = startPtnIdx; this.isBatchingEnabled = isBatchingEnabled; this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs); +this.clock = clock; } @Override public int choosePartition(Message msg, TopicMetadata topicMetadata) { // If the message has a key, it supersedes the round robin routing policy if (msg.hasKey()) { -return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions(); +return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions()); } if (isBatchingEnabled) { // if batching is enabled, choose partition on `maxBatchingDelayMs`
[GitHub] merlimat closed pull request #1523: Use signSafeMod in RoundRobinPartitionMessageRouter
merlimat closed pull request #1523: Use signSafeMod in RoundRobinPartitionMessageRouter URL: https://github.com/apache/incubator-pulsar/pull/1523 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 26f74cf24d..96a82543d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -168,7 +168,7 @@ public MessageId send(Message message) throws PulsarClientException { int partition = routerPolicy.choosePartition(message, topicMetadata); checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(), -"Illegal partition index chosen by the message routing policy"); +"Illegal partition index chosen by the message routing policy: " + partition); return producers.get(partition).sendAsync(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index 87cfd1a436..ee8b9a0428 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.pulsar.client.util.MathUtils.signSafeMod; + +import java.time.Clock; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.pulsar.client.api.HashingScheme; @@ -46,35 +48,42 @@ private final boolean isBatchingEnabled; private final long maxBatchingDelayMs; -@VisibleForTesting +private final Clock clock; + +private static final Clock SYSTEM_CLOCK = Clock.systemUTC(); + public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, -int startPtnIdx) { -this(hashingScheme, startPtnIdx, false, 0); +int startPtnIdx, +boolean isBatchingEnabled, +long maxBatchingDelayMs) { +this(hashingScheme, startPtnIdx, isBatchingEnabled, maxBatchingDelayMs, SYSTEM_CLOCK); } public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, int startPtnIdx, boolean isBatchingEnabled, -long maxBatchingDelayMs) { +long maxBatchingDelayMs, +Clock clock) { super(hashingScheme); PARTITION_INDEX_UPDATER.set(this, startPtnIdx); this.startPtnIdx = startPtnIdx; this.isBatchingEnabled = isBatchingEnabled; this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs); +this.clock = clock; } @Override public int choosePartition(Message msg, TopicMetadata topicMetadata) { // If the message has a key, it supersedes the round robin routing policy if (msg.hasKey()) { -return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions(); +return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions()); } if (isBatchingEnabled) { // if batching is enabled, choose partition on `maxBatchingDelayMs` boundary. -long currentMs = System.currentTimeMillis(); -return (((int) (currentMs / maxBatchingDelayMs)) + startPtnIdx) % topicMetadata.numPartitions(); +long currentMs = clock.millis(); +return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, topicMetadata.numPartitions()); } else { -return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % topicMetadata.numPartitions()); +return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java index 9cef8b55ae..aacbe4cda4 100644 ---
[GitHub] lucperkins opened a new pull request #1528: Fix for FQFN flag
lucperkins opened a new pull request #1528: Fix for FQFN flag URL: https://github.com/apache/incubator-pulsar/pull/1528 I realized when trying to create a function that the `--fqfn` flag was added to the wrong command base. It doesn't make sense to make it available for namespace-level commands. Only function-level commands should have it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic
merlimat commented on issue #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic URL: https://github.com/apache/incubator-pulsar/pull/1527#issuecomment-379817560 @zhaijack I need to look well into this change, though I think it looks good. I believe since adding to the ownership map and the linked list are not atomic there still might be some operations done multiple times. One other option could be to convert the `ownedBundlesCache` in `OwnershipCache` to use a `ConcurrentHashMap`. That's the same trick we use to load topic, ensuring only the first attempt triggers the load and subsequent attempts will just piggyback on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1526: make connection timeout config-able for consumer and producer
merlimat commented on issue #1526: make connection timeout config-able for consumer and producer URL: https://github.com/apache/incubator-pulsar/pull/1526#issuecomment-379815567 @zhaijack That's not exactly a "timeout" though. That would be the maximum backoff time. The `Backoff` class has the exponential time increase logic. It starts at 100ms and keep doubling the retry time between attempts until reaching 1min. After that it keeps trying to reconnect every 1min. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true
zhaijack commented on issue #1517: Reader#hasMessageAvailable can report false when it should be true URL: https://github.com/apache/incubator-pulsar/issues/1517#issuecomment-379806318 Thanks @ivankelly , seems not considered this situation before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack opened a new pull request #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic
zhaijack opened a new pull request #1527: Reduce un-necessary work when doing bundle lookup for partitioned topic URL: https://github.com/apache/incubator-pulsar/pull/1527 ### Motivation For partitioned topic, when create producer or consumer for each partition concurrently, it will call `pularclient.getConnection` and will finally call into `namespace.searchForCandidateBroker` to acquire target bundle. If this bundle contains more then one partitioned topic, then each of them will call the bundle related handling(acquire ownership, loadNamespaceTopics), this waste some resource. This change tries to let the first partition topic do the real work once, and do the piggy back for other concurrent topics handling. e.g. ``` Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 01:43:25.301 [pulsar-1-17] INFO org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership of benchmark/ local/ns-CeV_hcU/0x1400_0x1800 . . Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 01:43:25.301 [pulsar-1-10] INFO org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership of benchmark/ local/ns-CeV_hcU/0x1400_0x1800 Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 01:43:25.301 [pulsar-1-15] INFO org.apache.pulsar.broker.namespace.OwnershipCache - Trying to acquire ownership of benchmark/ local/ns-CeV_hcU/0x1400_0x1800 Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 01:43:25.304 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.pulsar.broker.namespace.OwnershipCache -Successfully acquired ownership of /namespace/benchmark/local/ns-CeV_hcU/0x1400_0x1800 Apr 05 01:43:25 ip-10-0-0-145.us-west-2.compute.internal pulsar[12989]: 01:43:25.304 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.pulsar.broker.namespace.OwnershipCache -Successfully acquired ownership of /namespace/benchmark/local/ns-CeV_hcU/0x1400_0x1800 .. ``` ### Modifications This change tries to let the first partition topic do the real work only once, and do the piggy back for other concurrent topics handling. ### Result getConnection for partitioned consumer and producer would be more efficient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave commented on issue #1137: Schema registry
mgodave commented on issue #1137: Schema registry URL: https://github.com/apache/incubator-pulsar/pull/1137#issuecomment-379797837 All changes are merged, this tracking ticket can be closed or merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitH b] zhaijack opened a new p ll req est #1526: make connection timeo t config-able for cons mer and prod cer
zhaijack opened a new pull request #1526: make connection timeout config-able for consumer and producer URL: https://github.com/apache/incubator-pulsar/pull/1526 ### Motivation In ConsumerImpl and ProducerImpl, the max timeout for connection is hard code as 60 seconds. This change try to make it config-able, and use value of client.configuration.getOperationTimeoutMs ### Modifications change hard code 60s into client.configuration.getOperationTimeoutMs ### Result connection timeout could be config-able. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitH b] ivankelly commented on iss e #1513: Managed ledger ses ReadHandle in read path
ivankelly commented on issue #1513: Managed ledger uses ReadHandle in read path URL: https://github.com/apache/incubator-pulsar/pull/1513#issuecomment-379789804 retest this please // BasicEndToEndTest.testStatsLatencies This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitH b] maskit commented on iss e #1266: Pass all Apache Podling Website Checks
maskit commented on issue #1266: Pass all Apache Podling Website Checks URL: https://github.com/apache/incubator-pulsar/issues/1266#issuecomment-379776346 I got an email on another project, and it encourages to add ApacheCon promotion to our project site. http://apache.org/events/README.txt I think changing the URL for *our* current-event-page to an absolute URL is not a good idea because it doesn't work on a local copy, but putting another link for ApacheCon as an absolute URL would be a good idea to make the check result green. The logo will be updated automatically and will show the next event, so we can put the logo on our site permanently (no need to put, remove, put, remove, ...). Where should we put the logo? While I think it should be above the fold, I don't want to break current cool design. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitH b] ivankelly commented on iss e #1525: Compaction considers messages with empty payload as deleting the key
ivankelly commented on issue #1525: Compaction considers messages with empty payload as deleting the key URL: https://github.com/apache/incubator-pulsar/pull/1525#issuecomment-379761575 retest this please // 3 flakes // BrokerServiceLookupTest.testModularLoadManagerSplitBundle // BrokerClientIntegrationTest.testResetCursor // PatternTopicsConsumerImplTest.testAutoUnbubscribePatternConsumer This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1513: Managed ledger uses ReadHandle in read path
ivankelly commented on a change in pull request #1513: Managed ledger uses ReadHandle in read path URL: https://github.com/apache/incubator-pulsar/pull/1513#discussion_r180074490 ## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java ## @@ -239,43 +243,44 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo } // Read all the entries from bookkeeper -lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) -> { - -if (rc != BKException.Code.OK) { -if (rc == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(createManagedLedgerException(rc), ctx); -} else { -ml.invalidateLedgerHandle(lh1, rc); -ManagedLedgerException mlException = createManagedLedgerException(rc); -callback.readEntriesFailed(mlException, ctx); -} -return; -} - -checkNotNull(ml.getName()); -checkNotNull(ml.getExecutor()); -ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { -// We got the entries, we need to transform them to a List<> type -long totalSize = 0; -final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); -while (sequence.hasMoreElements()) { -// Insert the entries at the end of the list (they will be unsorted for now) -LedgerEntry ledgerEntry = sequence.nextElement(); -EntryImpl entry = EntryImpl.create(ledgerEntry); -ledgerEntry.getEntryBuffer().release(); - -entriesToReturn.add(entry); - -totalSize += entry.getLength(); - -} - - manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); -ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); - -callback.readEntriesComplete((List) entriesToReturn, ctx); -})); -}, callback); +lh.readAsync(firstEntry, lastEntry).whenComplete( +(ledgerEntries, exception) -> { +if (exception != null) { +if (exception instanceof BKException +&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); +} else { +ml.invalidateLedgerHandle(lh, exception); +ManagedLedgerException mlException = createManagedLedgerException(exception); +callback.readEntriesFailed(mlException, ctx); +} +return; +} + +checkNotNull(ml.getName()); +checkNotNull(ml.getExecutor()); +ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { Review comment: Didn't occur to me when I was first doing it, but done now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch asf-site updated: Updated site at revision 8ad606b
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/asf-site by this push: new d3a6fac Updated site at revision 8ad606b d3a6fac is described below commit d3a6facc7fc84f5c8a05035ddfb82039ed8ae6f7 Author: jenkinsAuthorDate: Mon Apr 9 08:22:22 2018 + Updated site at revision 8ad606b --- content/docs/latest/adaptors/PulsarSpark/index.html| 2 +- content/docs/latest/adaptors/PulsarStorm/index.html| 2 +- content/docs/latest/admin-api/overview/index.html | 8 content/docs/latest/admin/Authz/index.html | 4 ++-- content/docs/latest/clients/Java/index.html| 2 +- content/docs/latest/cookbooks/PartitionedTopics/index.html | 2 +- content/docs/latest/cookbooks/RetentionExpiry/index.html | 2 +- content/docs/latest/cookbooks/message-deduplication/index.html | 2 +- content/docs/latest/deployment/aws-cluster/index.html | 2 +- content/docs/latest/deployment/cluster/index.html | 2 +- content/docs/latest/deployment/instance/index.html | 2 +- content/docs/latest/reference/CliTools/index.html | 4 ++-- 12 files changed, 17 insertions(+), 17 deletions(-) diff --git a/content/docs/latest/adaptors/PulsarSpark/index.html b/content/docs/latest/adaptors/PulsarSpark/index.html index 91d64a2..05093a7 100644 --- a/content/docs/latest/adaptors/PulsarSpark/index.html +++ b/content/docs/latest/adaptors/PulsarSpark/index.html @@ -1421,9 +1421,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/adaptors/PulsarStorm/index.html b/content/docs/latest/adaptors/PulsarStorm/index.html index ef328f8..bdcde4e 100644 --- a/content/docs/latest/adaptors/PulsarStorm/index.html +++ b/content/docs/latest/adaptors/PulsarStorm/index.html @@ -1211,9 +1211,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/admin-api/overview/index.html b/content/docs/latest/admin-api/overview/index.html index 3cf7922..b5f413b 100644 --- a/content/docs/latest/admin-api/overview/index.html +++ b/content/docs/latest/admin-api/overview/index.html @@ -1009,9 +1009,9 @@ - The Pulsar admin interface + The Pulsar admin interface @@ -1221,9 +1221,9 @@ - The Pulsar admin interface + The Pulsar admin interface @@ -1431,9 +1431,9 @@ - The Pulsar admin interface + The Pulsar admin interface @@ -1649,9 +1649,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/admin/Authz/index.html b/content/docs/latest/admin/Authz/index.html index 7879561..bd75e25 100644 --- a/content/docs/latest/admin/Authz/index.html +++ b/content/docs/latest/admin/Authz/index.html @@ -1009,9 +1009,9 @@ - The Pulsar admin interface + The Pulsar admin interface @@ -2067,9 +2067,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/clients/Java/index.html b/content/docs/latest/clients/Java/index.html index 4ccd584..edb9c89 100644 --- a/content/docs/latest/clients/Java/index.html +++ b/content/docs/latest/clients/Java/index.html @@ -1217,9 +1217,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/cookbooks/PartitionedTopics/index.html b/content/docs/latest/cookbooks/PartitionedTopics/index.html index f26ae01..39301b5 100644 --- a/content/docs/latest/cookbooks/PartitionedTopics/index.html +++ b/content/docs/latest/cookbooks/PartitionedTopics/index.html @@ -1429,9 +1429,9 @@ - The Pulsar admin interface + The Pulsar admin interface diff --git a/content/docs/latest/cookbooks/RetentionExpiry/index.html