[GitHub] jiazhai removed a comment on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster
jiazhai removed a comment on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster URL: https://github.com/apache/incubator-pulsar/pull/2560#issuecomment-421236834 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster
jiazhai commented on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster URL: https://github.com/apache/incubator-pulsar/pull/2560#issuecomment-421236834 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2508: PIP-22: Dead Letter Topic
sijie commented on issue #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#issuecomment-421236785 run java8 tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2571: Change repository tag for test image
sijie commented on issue #2571: Change repository tag for test image URL: https://github.com/apache/incubator-pulsar/pull/2571#issuecomment-421236455 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#issuecomment-421231882 > --topic-pattern = Input Topic Pattern with default schema hmm, I don't believe so. Let me find the original pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#issuecomment-421232586 looking into the refactor. I believe the refactor mess up this - https://github.com/apache/incubator-pulsar/blob/59df98bd5675fa17cc858fa2a0c5cac45475a5f4/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java#L628 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#issuecomment-421232435 https://github.com/apache/incubator-pulsar/pull/1903/files#diff-cae1c6ad6356263a1292cfcb9fddc015R110 if --topic-pattern is specified, it will look up the --custom-serde-inputs, if no serde is used, it will be using default serde. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#issuecomment-421231232 @srkukarni : ``` @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)", hidden = true) protected String DEPRECATED_topicsPattern; @Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)") protected String topicsPattern; ``` unless I missing understand the comment here ... and here https://github.com/apache/incubator-pulsar/blob/59df98bd5675fa17cc858fa2a0c5cac45475a5f4/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java#L628 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pablichjenkov opened a new issue #2581: Android and IOS client support
pablichjenkov opened a new issue #2581: Android and IOS client support URL: https://github.com/apache/incubator-pulsar/issues/2581 I am trying to build a Chat App with the purpose of testing Apache Pulsar. I am facing the following problems on both mobile platforms. Android: I was trying to run the java client on Android having no success at doing so. Pure Java compilation phase passes, however, at some point later the Android build process transform all .class files to .dex files, to run on the Android VM(ART). During this process I had some errors complaining about some classes either being duplicated or corrupted. On top of that we still have the risk that the a Classloader failed to resolve certain JAR dependency class at run time. IOS: No Client at all. Given above unsuitability I decided to create my own REST clients. I am able to run Pulsar server application on my local environment and send/receive messages using the pulsar-client program. However, I have not been able to connect to the server from Android. The [official documentation](https://pulsar.incubator.apache.org/docs/latest/clients/WebSocket/#Producerendpoint-r2hxu) says that in order to send a message, a client should create a WebSocket connection to the broker endpoint and send a json formatted text with a key named **payload**. Following that I should be able to connect to the Producer endpoint with bellow code unless I am missing something.: *Bellow code uses OkHttpClient library* ` String wsUrl = "ws://192.168.1.123:6650/ws/v2/producer/persistent/public/default/my-topic"; webSocket = okHttpClient.newWebSocket(wsRequest, wsListener); ` As soon as the connection is started the websocket listener invoke the Failure callback method. Below message is shown in the logs by the okHttpClient: ` D/OkHttp: <-- HTTP FAILED: java.io.IOException: unexpected end of stream on Connection{192.168.1.123:6650, proxy=DIRECT hostAddress=/192.168.1.123:6650 cipherSuite=none protocol=http/1.1} ` Check code here [sample source code:](https://github.com/pablichjenkov/Pulsar-Droid/blob/master/app/src/main/java/com/ncl/pulsarj/MainActivity.java) **I am running a local standalone instance in a MAC Book Pro and Firewall is OFF.** Thanks in advance This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #2577: fix behavior of JSONSchema for derived classes
jerrypeng opened a new pull request #2577: fix behavior of JSONSchema for derived classes URL: https://github.com/apache/incubator-pulsar/pull/2577 ### Motivation Currently declaring a JSONSchema of a certain class and serializing a instance of a derived class does not produce the correct behavior This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI
srkukarni commented on issue #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#issuecomment-421206316 --customSerdeMap was a map from topicName -> Serde and not topicPattern -> Serde. The way we specify topicPattern was by a different input-parameter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client
massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-421205105 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam removed a comment on issue #2572: Shorten the timeout value of C++ ZTS client
massakam removed a comment on issue #2572: Shorten the timeout value of C++ ZTS client URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-421204621 rerun java8 tests rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client
massakam commented on issue #2572: Shorten the timeout value of C++ ZTS client URL: https://github.com/apache/incubator-pulsar/pull/2572#issuecomment-421204621 rerun java8 tests rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dsambandam commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1
dsambandam commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1 URL: https://github.com/apache/incubator-pulsar/issues/2576#issuecomment-421200024 @sijie I replaced the real hostname with broker1 when I posted it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2535: Add ledger op timeout to avoid topics stuck on ledger-creation
rdhabalia commented on issue #2535: Add ledger op timeout to avoid topics stuck on ledger-creation URL: https://github.com/apache/incubator-pulsar/pull/2535#issuecomment-421199228 @merlimat fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2535: Add ledger op timeout to avoid topics stuck on ledger-creation
rdhabalia commented on a change in pull request #2535: Add ledger op timeout to avoid topics stuck on ledger-creation URL: https://github.com/apache/incubator-pulsar/pull/2535#discussion_r217576629 ## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java ## @@ -56,6 +56,7 @@ private boolean autoSkipNonRecoverableData; private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); private long offloadAutoTriggerSizeThresholdBytes = -1; +private long opTimeoutSec = 5 * 60; //5 minutes Review comment: sure, let me rename it. > Also, the default in ServiceConfiguration seems to be 60sec Sure, I can make it 60 sec but in this PR, default time we are setting as `ZooKeeperSessionTimeout (default 30sec) *2` as it mainly happens when zk-client fails to complete callback when zk leader restarts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar
aahmed-se commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r217573668 ## File path: pulsar-client-tools/pom.xml ## @@ -101,6 +101,18 @@ test + Review comment: do it will remove them This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1
sijie commented on issue #2576: Pulsar client consume command fails with java.lang.IllegalArgumentException: port out of range:-1 URL: https://github.com/apache/incubator-pulsar/issues/2576#issuecomment-421193236 @dsambandam what is your broker url? is it `broker1`? or `broker1` was replacing the real domain name when you posting it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r217571498 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java ## @@ -0,0 +1,29 @@ +package org.apache.pulsar.admin.cli.utils; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.ReflectData; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.schema.JsonSchema; + +import java.io.IOException; + +public class SchemaExtractor { + +public static String getJsonSchema(Class clazz) throws IOException { + +org.codehaus.jackson.map.ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.WRITE_ENUMS_USING_TO_STRING, true); + +JsonSchema schema = mapper.generateJsonSchema(clazz); + +return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(schema); +} + +public static String getAvroSchema(Class clazz) { Review comment: use `Schema.AVRO(class).getSchemaInfo()` to get schema This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r217571566 ## File path: pulsar-client-tools/pom.xml ## @@ -101,6 +101,18 @@ test + Review comment: I don't think you need to manually add these. just the methods in Schema class to get schema info. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: [WIP] Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r217571451 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java ## @@ -0,0 +1,29 @@ +package org.apache.pulsar.admin.cli.utils; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.ReflectData; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.schema.JsonSchema; + +import java.io.IOException; + +public class SchemaExtractor { Review comment: use `Schema.JSON(class).getSchemaInfo()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng closed pull request #2577: fix behavior of JSONSchema for derived classes
jerrypeng closed pull request #2577: fix behavior of JSONSchema for derived classes URL: https://github.com/apache/incubator-pulsar/pull/2577 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-schema/pom.xml b/pulsar-client-schema/pom.xml index 477627e77b..5e72f6bfeb 100644 --- a/pulsar-client-schema/pom.xml +++ b/pulsar-client-schema/pom.xml @@ -70,6 +70,12 @@ jackson-module-jsonSchema + +com.google.code.gson +gson +${gson.version} + + org.apache.pulsar diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java index 5465f8c185..5c3354307c 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java @@ -22,13 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.avro.reflect.ReflectData; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -36,14 +39,24 @@ private final org.apache.avro.Schema schema; private final SchemaInfo schemaInfo; -private final ObjectMapper objectMapper; +private final Gson gson; private final Class pojo; private Map properties; private JSONSchema(Class pojo, Map properties) { this.pojo = pojo; this.properties = properties; -this.objectMapper = new ObjectMapper(); +this.gson = new GsonBuilder().addSerializationExclusionStrategy(new ExclusionStrategy() { +@Override +public boolean shouldSkipField(FieldAttributes f) { +return !f.getDeclaringClass().equals(pojo); +} + +@Override +public boolean shouldSkipClass(Class clazz) { +return false; +} +}).create(); this.schema = ReflectData.AllowNull.get().getSchema(pojo); this.schemaInfo = new SchemaInfo(); @@ -55,9 +68,10 @@ private JSONSchema(Class pojo, Map properties) { @Override public byte[] encode(T message) throws SchemaSerializationException { + try { -return objectMapper.writeValueAsBytes(message); -} catch (JsonProcessingException e) { +return this.gson.toJson(message).getBytes(); +} catch (RuntimeException e) { throw new SchemaSerializationException(e); } } @@ -65,8 +79,8 @@ private JSONSchema(Class pojo, Map properties) { @Override public T decode(byte[] bytes) { try { -return objectMapper.readValue(new String(bytes), pojo); -} catch (IOException e) { +return this.gson.fromJson(new String(bytes), this.pojo); +} catch (RuntimeException e) { throw new RuntimeException(new SchemaSerializationException(e)); } } @@ -85,6 +99,7 @@ public SchemaInfo getSchemaInfo() { public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() { SchemaInfo backwardsCompatibleSchemaInfo; try { +ObjectMapper objectMapper = new ObjectMapper(); JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper); JsonSchema jsonBackwardsCompatibileSchema = schemaGen.generateSchema(pojo); backwardsCompatibleSchemaInfo = new SchemaInfo(); diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java index 7a677f4e69..4c6426c952 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java @@ -18,17 +18,18 @@ */ package org.apache.pulsar.client.schema; -import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS; -import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON; - import org.apache.avro.Schema; import org.apache.pulsar.client.impl.schema.JS
[GitHub] sijie commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#discussion_r217570601 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java ## @@ -111,6 +114,26 @@ public void initialize() { } } +static FunctionMetaData normalizeFunctionMetaData(FunctionMetaData fmd) { +if (!fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().isEmpty()) { +FunctionDetails.Builder fdb = FunctionDetails.newBuilder(fmd.getFunctionDetails()); +for (Map.Entry topicEntry : fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().entrySet()) { +fdb.getSourceBuilder().putInputSpecs( +topicEntry.getKey(), +ConsumerSpec.newBuilder() +.setSerdeClassName(topicEntry.getValue()) +.setIsRegexPattern(topicEntry.getKey() == fmd.getFunctionDetails().getSource().getTopicsPattern()) Review comment: why? if topic patterns was used, this should be true, no? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one
merlimat commented on issue #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one URL: https://github.com/apache/incubator-pulsar/pull/2580#issuecomment-421185405 @jiazhai I missed that in the original PR. I think this should reduce the publicly exposed API in the Python client This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one
merlimat opened a new pull request #2580: [Python] Consolidated duplicated subscribe_*() methods into a single one URL: https://github.com/apache/incubator-pulsar/pull/2580 ### Motivation In Python client lib, removed duplicated code and consolidated `subscribe_topics()` and `subscribe_topic_pattern()` into `subscribe()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2579: Misc fixes
srkukarni opened a new pull request #2579: Misc fixes URL: https://github.com/apache/incubator-pulsar/pull/2579 ### Motivation This pr does a couple of minor nits 1. Add a log statement in Sink to make it consistent with the Source. 2. Added AllArgs/NoArgs constructor for some test related objects ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes
srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes URL: https://github.com/apache/incubator-pulsar/pull/2577#issuecomment-421182362 run cpp tests run integration tests run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes
srkukarni commented on issue #2577: fix behavior of JSONSchema for derived classes URL: https://github.com/apache/incubator-pulsar/pull/2577#issuecomment-421182305 run tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2508: PIP-22: Dead Letter Topic
merlimat commented on issue #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#issuecomment-421181976 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: ManagedLedger only closes ledger on error if current ledger (#240) (#2573)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7c62ecf ManagedLedger only closes ledger on error if current ledger (#240) (#2573) 7c62ecf is described below commit 7c62ecf4d75ad396066add41246fcb4f92a7c301 Author: Ivan Kelly AuthorDate: Fri Sep 14 00:59:55 2018 +0200 ManagedLedger only closes ledger on error if current ledger (#240) (#2573) If we have a managed ledger, ml and we write 2 entries to it, if both entries fail, both will end up calling ManagedLedgerImpl#ledgerClosed with the ledger the write failed on as a parameter. However, depending on timing, the second call to ledgerClosed could end up adding a new ledger to the ledger list, even though the current ledger is _not_ failing (as the failing ledger was replaced by the first call). This was the cause of a flake in ManagedLedgerErrorsTest#recoverLongTimeAfterMultipleWriteErrors as reported in (#240). However, it's not possible to get a deterministic test for this as the timings need to be very precise. The failing addComplete needs to run before first error handling completes, but the runnable with ledgerClosed for the second failure needs to run after the first error handling completes, but before the write resends from the first error handling complete. --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6e4a609..f332e17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1277,7 +1277,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { synchronized void ledgerClosed(final LedgerHandle lh) { final State state = STATE_UPDATER.get(this); -if (state == State.ClosingLedger || state == State.LedgerOpened) { +LedgerHandle currentLedger = this.currentLedger; +if (currentLedger == lh && (state == State.ClosingLedger || state == State.LedgerOpened)) { STATE_UPDATER.set(this, State.ClosedLedger); } else if (state == State.Closed) { // The managed ledger was closed during the write operation
[GitHub] aahmed-se opened a new pull request #2578: [WIP] Add support for schema extraction from a jar
aahmed-se opened a new pull request #2578: [WIP] Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578 Add options to create a schema based on pojo class in a jar. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2573: ManagedLedger only closes ledger on error if current ledger (#240)
merlimat closed pull request #2573: ManagedLedger only closes ledger on error if current ledger (#240) URL: https://github.com/apache/incubator-pulsar/pull/2573 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6e4a6090c5..f332e17710 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1277,7 +1277,8 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { synchronized void ledgerClosed(final LedgerHandle lh) { final State state = STATE_UPDATER.get(this); -if (state == State.ClosingLedger || state == State.LedgerOpened) { +LedgerHandle currentLedger = this.currentLedger; +if (currentLedger == lh && (state == State.ClosingLedger || state == State.LedgerOpened)) { STATE_UPDATER.set(this, State.ClosedLedger); } else if (state == State.Closed) { // The managed ledger was closed during the write operation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI
srkukarni commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#discussion_r217554182 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java ## @@ -111,6 +114,26 @@ public void initialize() { } } +static FunctionMetaData normalizeFunctionMetaData(FunctionMetaData fmd) { +if (!fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().isEmpty()) { +FunctionDetails.Builder fdb = FunctionDetails.newBuilder(fmd.getFunctionDetails()); +for (Map.Entry topicEntry : fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().entrySet()) { +fdb.getSourceBuilder().putInputSpecs( +topicEntry.getKey(), +ConsumerSpec.newBuilder() +.setSerdeClassName(topicEntry.getValue()) +.setIsRegexPattern(topicEntry.getKey() == fmd.getFunctionDetails().getSource().getTopicsPattern()) Review comment: this should be false This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI
sijie commented on a change in pull request #2570: Fix BC issue in functions trigger function submitted by old CLI URL: https://github.com/apache/incubator-pulsar/pull/2570#discussion_r217553912 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java ## @@ -595,16 +594,23 @@ public Response triggerFunction(final String tenant, final String namespace, fin String inputTopicToWrite; if (topic != null) { inputTopicToWrite = topic; +} else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) { Review comment: changed to normalizing the function metadata This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #2577: fix behavior of JSONSchema for derived classes
jerrypeng opened a new pull request #2577: fix behavior of JSONSchema for derived classes URL: https://github.com/apache/incubator-pulsar/pull/2577 ### Motivation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dsambandam opened a new issue #2576: Pulsar client consume command fails with
dsambandam opened a new issue #2576: Pulsar client consume command fails with URL: https://github.com/apache/incubator-pulsar/issues/2576 Expected behavior Tell us what should happen Actual behavior ./bin/pulsar-client consume persistent://public/default/test/will_test -s my-topic -n 10 -r 0 webServiceUrl=https://broker1:8443/ brokerServiceUrl=pulsar+ssl://broker1:6651/ useTls=true tlsTrustCertsFilePath=/Path/my-ca/certs/ca.cert.pem #tlsEnableHostnameVerification=true authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls authParams=tlsCertFile:/Path/my-ca/broker.cert.pem,tlsKeyFile:/Path/my-ca/broker.key-pk8.pem 14:59:46.360 [pulsar-io-21-31] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:45346] Client successfully authenticated with tls role admin and originalPrincipal null 14:59:46.363 [ForkJoinPool.commonPool-worker-3] WARN org.apache.pulsar.broker.web.PulsarWebResource - [admin] Cluster does not exist: requested=default 14:59:46.363 [ForkJoinPool.commonPool-worker-3] WARN org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup admin for topic persistent://public/default/test/auth_test with error org.apache.pulsar.broker.web.RestException: Cluster does not exist: cluster=default java.util.concurrent.CompletionException: org.apache.pulsar.broker.web.RestException: Cluster does not exist: cluster=default 2018-09-13 14:56:20,170 pulsar-client-io-1-1 DEBUG AsyncLogger.ThreadNameStrategy=UNCACHED (user specified null, default is UNCACHED) 14:56:20.356 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer perf with config: { "topicNames" : [ "persistent://public/default/test/will_test" ], "topicsPattern" : null, "subscriptionName" : "my-topic", "subscriptionType" : "Exclusive", "receiverQueueSize" : 1000, "acknowledgementsGroupTimeMicros" : 10, "maxTotalReceiverQueueSizeAcrossPartitions" : 5, "consumerName" : null, "ackTimeoutMillis" : 0, "priorityLevel" : 0, "cryptoFailureAction" : "FAIL", "properties" : { }, "readCompacted" : false, "subscriptionInitialPosition" : "Latest", "patternAutoDiscoveryPeriod" : 1 } 14:56:20.359 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "pulsar+ssl://localhost:6651/", "operationTimeoutMs" : 3, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : true, "tlsTrustCertsFilePath" : "/PATH/my-ca/certs/ca.cert.pem", "tlsAllowInsecureConnection" : true, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 5, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30 } 14:56:20.381 [pulsar-client-io-1-1] ERROR org.apache.pulsar.client.impl.ClientCnx - [id: 0x43eb918e, L:/127.0.0.1:45286 - R:localhost/127.0.0.1:6651] Close connection becaues received internal-server error org.apache.pulsar.broker.web.RestException: Cluster does not exist: cluster=default 14:56:20.384 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.BinaryProtoLookupService - [persistent://public/default/test/will_test] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: org.apache.pulsar.broker.web.RestException: Cluster does not exist: cluster=default Steps to reproduce How can we reproduce the issue System configuration **Pulsar version**: x.y This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2508: PIP-22: Dead Letter Topic
sijie commented on issue #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#issuecomment-421164855 run java8 tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2548: Stop all functions gracefully on closing worker-service
sijie closed pull request #2548: Stop all functions gracefully on closing worker-service URL: https://github.com/apache/incubator-pulsar/pull/2548 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 93828de40d..75a70e0fb6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -403,6 +403,25 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu return Response.status(Status.OK).build(); } +/** + * It stops all functions instances owned by current worker + * @throws Exception + */ +public void stopAllOwnedFunctions() throws Exception { +final String workerId = this.workerConfig.getWorkerId(); +Map assignments = workerIdToAssignments.get(workerId); +if (assignments != null) { +assignments.values().forEach(assignment -> { +String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); +try { +stopFunction(fullyQualifiedInstanceId, false); +} catch (Exception e) { +log.warn("Failed to stop function {} - {}", fullyQualifiedInstanceId, e.getMessage()); +} +}); +} +} + private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception { log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId); FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); @@ -658,6 +677,7 @@ private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRun @Override public void close() throws Exception { +stopAllOwnedFunctions(); this.functionActioner.close(); this.functionAssignmentTailer.close(); if (runtimeFactory != null) { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: optimizing throughput in Pulsar Presto connector (#2564)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6ef7aca optimizing throughput in Pulsar Presto connector (#2564) 6ef7aca is described below commit 6ef7acaf37d57769c4d9dbf7558ef627ce061339 Author: Boyang Jerry Peng AuthorDate: Thu Sep 13 14:50:01 2018 -0700 optimizing throughput in Pulsar Presto connector (#2564) ### Motivation 1. Currently, the presto pulsar connector will read synchronously from bookkeeper when it has run out of entries go process. Basically, we process a batch of entries and then we read more. Ideally should be doing reading and processing in parallel to increase throughput. 2. Each split initializes their own ManagedLedgerFactory/Bookkeeper client. We really just need one bookkeeper client to be shared among threads. ### Modifications 1. Rewrote the logic in the Presto Pulsar connector to read async and process in parallel 2. Cache ManagedLedgerFactory to be used across splits ### Result I see about 2X throughput improvement on single node as well as cluster (2 brokers, 3 bookies, 4 presto workers including coordinator) on AWS --- conf/presto/catalog/pulsar.properties | 6 +- .../apache/pulsar/sql/presto/PulsarConnector.java | 5 + .../pulsar/sql/presto/PulsarConnectorCache.java| 64 +++ .../pulsar/sql/presto/PulsarConnectorConfig.java | 26 ++- .../pulsar/sql/presto/PulsarRecordCursor.java | 201 +++-- .../pulsar/sql/presto/TestPulsarConnector.java | 146 --- .../pulsar/sql/presto/TestPulsarRecordCursor.java | 1 + 7 files changed, 326 insertions(+), 123 deletions(-) diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 23b945e..5f922e5 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181 # minimum number of entries to read at a single time pulsar.entry-read-batch-size=100 # default number of splits to use per query -pulsar.target-num-splits=4 +pulsar.target-num-splits=2 +# max message queue size +pulsar.max-split-message-queue-size=1 +# max entry queue size +pulsar.max-split-entry-queue-size = 1000 diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java index 1d89b51..498583d 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java @@ -87,6 +87,11 @@ public class PulsarConnector implements Connector { log.error(e, "Failed to close pulsar connector"); } try { +PulsarConnectorCache.shutdown(); +} catch (Exception e) { +log.error("Failed to shutdown pulsar connector cache"); +} +try { lifeCycleManager.stop(); } catch (Exception e) { log.error(e, "Error shutting down connector"); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java new file mode 100644 index 000..d13ddcd --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration; + +public class PulsarConnectorCache { + +private static PulsarConnectorCache instance; + +private final ManagedLedg
[incubator-pulsar] branch master updated: Stop all functions gracefully on closing worker-service (#2548)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 95ed9b9 Stop all functions gracefully on closing worker-service (#2548) 95ed9b9 is described below commit 95ed9b91af20bcdd396ee34d3ea6cc94a52fea31 Author: Rajan Dhabalia AuthorDate: Thu Sep 13 14:50:35 2018 -0700 Stop all functions gracefully on closing worker-service (#2548) ### Motivation Right now, if stopping [WorkerService](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java#L189) doesn't stop functions and all the threads stayed alive event `WorkerService` is stopped. ### Modifications Stop all function resource gracefully while stopping worker service. ### Result Function threads will not stay alive while stopping worker-service. --- .../functions/worker/FunctionRuntimeManager.java | 20 1 file changed, 20 insertions(+) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index ee6eeec..43cd27b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -389,6 +389,25 @@ public class FunctionRuntimeManager implements AutoCloseable{ return Response.status(Status.OK).build(); } +/** + * It stops all functions instances owned by current worker + * @throws Exception + */ +public void stopAllOwnedFunctions() throws Exception { +final String workerId = this.workerConfig.getWorkerId(); +Map assignments = workerIdToAssignments.get(workerId); +if (assignments != null) { +assignments.values().forEach(assignment -> { +String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); +try { +stopFunction(fullyQualifiedInstanceId, false); +} catch (Exception e) { +log.warn("Failed to stop function {} - {}", fullyQualifiedInstanceId, e.getMessage()); +} +}); +} +} + private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception { log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId); FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); @@ -647,6 +666,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Override public void close() throws Exception { +stopAllOwnedFunctions(); this.functionActioner.close(); this.functionAssignmentTailer.close(); if (runtimeFactory != null) {
[GitHub] sijie closed pull request #2567: Another fix for website build website/scripts/replace.js
sijie closed pull request #2567: Another fix for website build website/scripts/replace.js URL: https://github.com/apache/incubator-pulsar/pull/2567 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js index 13c95691a9..2970688dce 100644 --- a/site2/website/scripts/replace.js +++ b/site2/website/scripts/replace.js @@ -118,7 +118,7 @@ for (v of versions) { `${v}`, binaryReleaseUrl(`${v}`), connectorReleaseUrl(`${v}`), - prestoPulsarReleaseUrl(`${latestVersion}`) + prestoPulsarReleaseUrl(`${latestVersion}`), downloadPageUrl(), rpmReleaseUrl(`${v}`, ""), rpmReleaseUrl(`${v}`, "-debuginfo"), This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Another fix for website build (#2567)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0964dad Another fix for website build (#2567) 0964dad is described below commit 0964dadcd3f425a2d4c424d4ab1de1fb7e35ce84 Author: Matteo Merli AuthorDate: Thu Sep 13 17:49:36 2018 -0400 Another fix for website build (#2567) ### Motivation Comma is missing in the javascript list. --- site2/website/scripts/replace.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js index 13c9569..2970688 100644 --- a/site2/website/scripts/replace.js +++ b/site2/website/scripts/replace.js @@ -118,7 +118,7 @@ for (v of versions) { `${v}`, binaryReleaseUrl(`${v}`), connectorReleaseUrl(`${v}`), - prestoPulsarReleaseUrl(`${latestVersion}`) + prestoPulsarReleaseUrl(`${latestVersion}`), downloadPageUrl(), rpmReleaseUrl(`${v}`, ""), rpmReleaseUrl(`${v}`, "-debuginfo"),
[GitHub] sijie closed pull request #2564: optimizing throughput in Pulsar Presto connector
sijie closed pull request #2564: optimizing throughput in Pulsar Presto connector URL: https://github.com/apache/incubator-pulsar/pull/2564 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 23b945e3b3..5f922e5071 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181 # minimum number of entries to read at a single time pulsar.entry-read-batch-size=100 # default number of splits to use per query -pulsar.target-num-splits=4 +pulsar.target-num-splits=2 +# max message queue size +pulsar.max-split-message-queue-size=1 +# max entry queue size +pulsar.max-split-entry-queue-size = 1000 diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java index 1d89b519c2..498583d3c6 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java @@ -86,6 +86,11 @@ public final void shutdown() { } catch (Exception e) { log.error(e, "Failed to close pulsar connector"); } +try { +PulsarConnectorCache.shutdown(); +} catch (Exception e) { +log.error("Failed to shutdown pulsar connector cache"); +} try { lifeCycleManager.stop(); } catch (Exception e) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java new file mode 100644 index 00..d13ddcd84c --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration; + +public class PulsarConnectorCache { + +private static PulsarConnectorCache instance; + +private final ManagedLedgerFactory managedLedgerFactory; + +private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { +this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig); +} + +public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { +synchronized (PulsarConnectorCache.class) { +if (instance == null) { +instance = new PulsarConnectorCache(pulsarConnectorConfig); +} +} +return instance; +} + +private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { +ClientConfiguration bkClientConfiguration = new ClientConfiguration() +.setZkServers(pulsarConnectorConfig.getZookeeperUri()) +.setAllowShadedLedgerManagerFactoryClass(true) + .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.") +.setReadEntryTimeout(60); +return new ManagedLedgerFactoryImpl(bkClientConfiguration); +} + +public ManagedLedgerFactory getManagedLedgerFactory() { +return managedLedgerFactory; +} + +public static void shutdown() throws ManagedLedgerException, InterruptedException { +if (instance != null) { +instance.managedLedgerFactory.shutdown(); +instance = null; +} +} +} diff --git a/pulsar-
[GitHub] sijie commented on issue #2500: [schema] add schemas for primtive types
sijie commented on issue #2500: [schema] add schemas for primtive types URL: https://github.com/apache/incubator-pulsar/pull/2500#issuecomment-421164099 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2500: [schema] add schemas for primtive types
sijie commented on issue #2500: [schema] add schemas for primtive types URL: https://github.com/apache/incubator-pulsar/pull/2500#issuecomment-421164188 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…
sijie commented on issue #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… URL: https://github.com/apache/incubator-pulsar/pull/2543#issuecomment-421159758 @merlimat can you review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srids commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0
srids commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0 URL: https://github.com/apache/incubator-pulsar/issues/2568#issuecomment-421125156 ic. sounds good to fix in 2.1.1 then. thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0
sijie commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0 URL: https://github.com/apache/incubator-pulsar/issues/2568#issuecomment-421124528 @srids yes I mean 36. a release typically requires reviews and votes in apache software foundation. so if things are missing, we are fixing it in the next release :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srids commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0
srids commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0 URL: https://github.com/apache/incubator-pulsar/issues/2568#issuecomment-421120900 you mean 36 for macos :) - yea that should work but think it will be consistent to publish 2.1.0 for 36 as its already done for Linux but not for MacOS. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg
rdhabalia commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549#discussion_r217502837 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -133,4 +133,5 @@ message Instance { message Assignment { Instance instance = 1; string workerId = 2; +bool removed = 3; Review comment: > Why do we take this approach. Instead we could just write an empty value to the key, which is the removal? yes, that's true. I didn't see that removal at compaction and I also had the concern of removing defunct assignments. I will fix it and will also fix compaction if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0
sijie commented on issue #2568: MacOS: Could not find a version that satisfies the requirement pulsar-client==2.1.0 URL: https://github.com/apache/incubator-pulsar/issues/2568#issuecomment-421119465 @srids gotcha. we have a release 2.1.1 upcoming in 2 days. we can try to publish 37 for macos. does that work for you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg
jerrypeng commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549#discussion_r217500097 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -133,4 +133,5 @@ message Instance { message Assignment { Instance instance = 1; string workerId = 2; +bool removed = 3; Review comment: Ya we should set the key of the assignment to be null so that during compaction the assignment will be removed. If we don't do that, the assignment topic will still be littered with potentially many defunct assignments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2504: Add Presto Sql Test
aahmed-se commented on issue #2504: Add Presto Sql Test URL: https://github.com/apache/incubator-pulsar/pull/2504#issuecomment-42864 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg
srkukarni commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549#discussion_r217493540 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java ## @@ -34,15 +36,16 @@ private final FunctionRuntimeManager functionRuntimeManager; private final Reader reader; Review comment: Instead of byte[], do you think we could make it a typed reader? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2567: Another fix for website build website/scripts/replace.js
sijie commented on issue #2567: Another fix for website build website/scripts/replace.js URL: https://github.com/apache/incubator-pulsar/pull/2567#issuecomment-421110786 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg
srkukarni commented on a change in pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549#discussion_r217492954 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -133,4 +133,5 @@ message Instance { message Assignment { Instance instance = 1; string workerId = 2; +bool removed = 3; Review comment: Why do we take this approach. Instead we could just write an empty value to the key, which is the removal? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2503: add auto ack and timeout configurable
sijie commented on issue #2503: add auto ack and timeout configurable URL: https://github.com/apache/incubator-pulsar/pull/2503#issuecomment-421110182 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster
sijie commented on issue #2560: [proxy][functions] Issue #2154: proxy should be able to forward rest requests to function workers cluster URL: https://github.com/apache/incubator-pulsar/pull/2560#issuecomment-421110056 run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2516: Fix: get function status with auth enable
sijie closed pull request #2516: Fix: get function status with auth enable URL: https://github.com/apache/incubator-pulsar/pull/2516 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index def5452dbb..62d12ec9f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -160,7 +160,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( -tenant, namespace, functionName, instanceId); +tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 7dc7050ac5..1d315d78f0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -110,9 +110,8 @@ public FunctionStatusList getFunctionStatus( } } -@Override -public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) -throws PulsarAdminException { +public FunctionStatus getFunctionStatus( +String tenant, String namespace, String function, int id) throws PulsarAdminException { try { Response response = request( functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 93828de40d..ee6eeecd50 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; @@ -248,7 +249,7 @@ public synchronized void removeAssignments(Collection assignments) { * @return the function status */ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, - String functionName, int instanceId) { +String functionName, int instanceId, URI uri) { Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); @@ -306,23 +307,8 @@ public synchronized void removeAssignments(Collection assignments) { return functionStatusBuilder.build(); } -Client client = ClientBuilder.newClient(); - -// TODO: implement authentication/authorization -String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status";, -workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId)) -.request(MediaType.TEXT_PLAIN) -.get(String.class); - -InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); -try { - org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder); -} catch (IOException e) { -log.warn("Got invalid function status response from {}", workerInfo, e); -throw new RuntimeException(e); -} -functionStatusBuilder.setWorkerId(assignedWorkerId); -fun
[incubator-pulsar] branch master updated: Fix: get function status with auth enable (#2516)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e5a7b53 Fix: get function status with auth enable (#2516) e5a7b53 is described below commit e5a7b532ffbf1d8813fc31403162fe3c17e937f5 Author: Rajan Dhabalia AuthorDate: Thu Sep 13 11:29:37 2018 -0700 Fix: get function status with auth enable (#2516) --- .../pulsar/broker/admin/impl/FunctionsBase.java| 2 +- .../client/admin/internal/FunctionsImpl.java | 5 ++- .../functions/worker/FunctionRuntimeManager.java | 41 -- .../functions/worker/rest/api/FunctionsImpl.java | 6 ++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 2 +- 5 files changed, 23 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index def5452..62d12ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -160,7 +160,7 @@ public class FunctionsBase extends AdminResource implements Supplierhttp://%s:%d/admin/functions/%s/%s/%s/%d/status";, -workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId)) -.request(MediaType.TEXT_PLAIN) -.get(String.class); - -InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); -try { - org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder); -} catch (IOException e) { -log.warn("Got invalid function status response from {}", workerInfo, e); -throw new RuntimeException(e); -} -functionStatusBuilder.setWorkerId(assignedWorkerId); -functionStatus = functionStatusBuilder.build(); +URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); +throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } return functionStatus; @@ -426,9 +412,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ * @param namespace the namespace the function belongs to * @param functionName the function name * @return a list of function statuses + * @throws PulsarAdminException */ public InstanceCommunication.FunctionStatusList getAllFunctionStatus( -String tenant, String namespace, String functionName) { +String tenant, String namespace, String functionName) throws PulsarAdminException { Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -438,13 +425,15 @@ public class FunctionRuntimeManager implements AutoCloseable{ } for (Assignment assignment : assignments) { - -InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus( - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), -assignment.getInstance().getInstanceId()); - +boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); +InstanceCommunication.FunctionStatus functionStatus = isOwner +? (getFunctionInstanceStatus(tenant, namespace, functionName, +assignment.getInstance().getInstanceId(), null)) +: this.functionAdmin.functions().getFunctionStatus( + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), +assignment.getInstance().getInstanceId()); functionStatusListBuilder.addFunctionStatusList(functionStatus); } return functionStatusListBuilder.build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 136bab0..df82c0d 100644 --- a/pulsar-functions/worker/src/main/java
[GitHub] sijie commented on issue #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
sijie commented on issue #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/pull/2575#issuecomment-421106390 @jiazhai I think there are some tests need to be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2500: [schema] add schemas for primtive types
srkukarni commented on issue #2500: [schema] add schemas for primtive types URL: https://github.com/apache/incubator-pulsar/pull/2500#issuecomment-421100264 run java8 tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
sijie commented on a change in pull request #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/pull/2575#discussion_r217436845 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java ## @@ -615,4 +616,61 @@ public void testTopicsNameSubscribeWithBuilderFail() throws Exception { } } +/** + * Test Listener + */ +@Test(timeOut = 3) +public void testMultiTopicsMessageListener() throws Exception { +String key = "MultiTopicsMessageListenerTest"; +final String subscriptionName = "my-ex-subscription-" + key; +final String messagePredicate = "my-message-" + key + "-"; +final int totalMessages = 6; + +// set latch larger than totalMessages, so timeout message get resend +CountDownLatch latch = new CountDownLatch(totalMessages * 3); + +final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; +List topicNames = Lists.newArrayList(topicName1); + +admin.tenants().createTenant("prop", new TenantInfo()); +admin.topics().createPartitionedTopic(topicName1, 2); + +// 1. producer connect +Producer producer1 = pulsarClient.newProducer().topic(topicName1) +.enableBatching(false) +.messageRoutingMode(MessageRoutingMode.SinglePartition) +.create(); + +// 2. Create consumer, set not ack in message listener, so time-out message will resend +Consumer consumer = pulsarClient.newConsumer() +.topics(topicNames) +.subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared) +.ackTimeout(1000, TimeUnit.MILLISECONDS) +.receiverQueueSize(4) +.messageListener((c1, msg) -> { +assertNotNull(msg, "Message cannot be null"); +String receivedMessage = new String(msg.getData()); +latch.countDown(); + +log.info("Received message [{}] in the listener, latch: {}", +receivedMessage, latch.getCount()); +// since not acked, it should retry another time Review comment: nit: we can remove line 658 and 659 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
sijie commented on issue #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/pull/2575#issuecomment-421054906 Let's make this fix as part of 2.2 release This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai opened a new pull request #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
jiazhai opened a new pull request #2575: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/pull/2575 ### Motivation fix issue #2574 . Timeout message not get redeliver in TopicsConsumer when use message listener ### Modifications Let ConsumerImpl also track un-acked messages for TopicsConsumer Add a UT ### Result UT passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
jiazhai commented on issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/issues/2574#issuecomment-421044188 Penghui meet this issue in their test. This issue may be caused by the un-acked messages tracking in both individual ConsumerImpl and MultiTopicsConsumerImpl. In MultiTopicsConsumerImpl, we are managing and tracking all un-acked message. But when using MessageListener, there will be some issues. This is because MessageListener.received() is only called 1 time when the message is delivered from underline individual ConsumerImpl. The calling stack is like this: ``` MultiTopicsConsumerImpl.startReceivingMessages() // try to receive messages from all individual ConsumerImpl \ MultiTopicsConsumerImpl.receiveMessageFromConsumer(ConsumerImpl consumer) // call individual ConsumerImpl.receiveAsync() to import messages. And call MessageListener.received() in receiveAsync() call back. \ MultiTopicsConsumerImpl.messageReceived() // called in ConsumerImpl.receiveAsync() callback. \ listener.received ``` While in ConsumerImpl.messageProcessed(), it not tracking messages for MultiTopicsConsumerImpl. ``` if (partitionIndex != -1) { // we should no longer track this message, TopicsConsumer will take care from now onwards unAckedMessageTracker.remove(id);< === we are not tracking messages } else { unAckedMessageTracker.add(id); } ``` So for a timed-out message, it will not be redelivered in ConsumerImpl, and as above calling stack, MessageListener.received() will not get called. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai opened a new issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
jiazhai opened a new issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener URL: https://github.com/apache/incubator-pulsar/issues/2574 Expected behavior Timeout message get redeliver in TopicsConsumer when use message listener Actual behavior Tell us what happens instead Steps to reproduce How can we reproduce the issue System configuration **Pulsar version**: x.y This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #2573: ManagedLedger only closes ledger on error if current ledger (#240)
ivankelly opened a new pull request #2573: ManagedLedger only closes ledger on error if current ledger (#240) URL: https://github.com/apache/incubator-pulsar/pull/2573 If we have a managed ledger, ml and we write 2 entries to it, if both entries fail, both will end up calling ManagedLedgerImpl#ledgerClosed with the ledger the write failed on as a parameter. However, depending on timing, the second call to ledgerClosed could end up adding a new ledger to the ledger list, even though the current ledger is _not_ failing (as the failing ledger was replaced by the first call). This was the cause of a flake in ManagedLedgerErrorsTest#recoverLongTimeAfterMultipleWriteErrors as reported in (#240). However, it's not possible to get a deterministic test for this as the timings need to be very precise. The failing addComplete needs to run before first error handling completes, but the runnable with ledgerClosed for the second failure needs to run after the first error handling completes, but before the write resends from the first error handling complete. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2535: Add ledger op timeout to avoid topics stuck on ledger-creation
merlimat commented on a change in pull request #2535: Add ledger op timeout to avoid topics stuck on ledger-creation URL: https://github.com/apache/incubator-pulsar/pull/2535#discussion_r217344073 ## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java ## @@ -56,6 +56,7 @@ private boolean autoSkipNonRecoverableData; private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); private long offloadAutoTriggerSizeThresholdBytes = -1; +private long opTimeoutSec = 5 * 60; //5 minutes Review comment: Maybe use a more explicit name like `metadataOperationsTimeoutSeconds` ? Also, the default in `ServiceConfiguration` seems to be 60sec This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2567: Another fix for website build website/scripts/replace.js
merlimat commented on issue #2567: Another fix for website build website/scripts/replace.js URL: https://github.com/apache/incubator-pulsar/pull/2567#issuecomment-420968758 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2508: PIP-22: Dead Letter Topic
merlimat commented on issue #2508: PIP-22: Dead Letter Topic URL: https://github.com/apache/incubator-pulsar/pull/2508#issuecomment-420968603 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam opened a new pull request #2572: Shorten the timeout value of C++ ZTS client
massakam opened a new pull request #2572: Shorten the timeout value of C++ ZTS client URL: https://github.com/apache/incubator-pulsar/pull/2572 C++ ZTS client waits 1 seconds for a response from ZTS server even if there is no response. I think there is a mistake in specifying curl options. The second argument of `curl_easy_setopt()` should be `CURLOPT_TIMEOUT_MS`, not `CURLOPT_TIMEOUT`. https://github.com/apache/incubator-pulsar/blob/8b2929b3e5403fb44653976dc74be287666f7b96/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc#L48 https://github.com/apache/incubator-pulsar/blob/8b2929b3e5403fb44653976dc74be287666f7b96/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc#L275 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services