[GitHub] srkukarni opened a new pull request #2616: Fixed the null checking of retryDetails field
srkukarni opened a new pull request #2616: Fixed the null checking of retryDetails field URL: https://github.com/apache/incubator-pulsar/pull/2616 ### Motivation In proto3, default value of a message field is not null but rather a default instance where all its members are set to default values recursively. This meant that our check for the existence of the field is not right. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2610: Prepare website for future releases without '-incubating' suffix
merlimat commented on issue #2610: Prepare website for future releases without '-incubating' suffix URL: https://github.com/apache/incubator-pulsar/pull/2610#issuecomment-423000219 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis
rdhabalia commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r219004080 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,153 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); Review comment: sure, I mean , can we do something like: `return publishProducers.computeIfAbsent(producerId, createProducer(..));` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se opened a new pull request #2617: Add sample docker-compose file for standalone
aahmed-se opened a new pull request #2617: Add sample docker-compose file for standalone URL: https://github.com/apache/incubator-pulsar/pull/2617 Add sample compose file for experimentation purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #382: Broker LoadFactor CPU NaN
aahmed-se commented on issue #382: Broker LoadFactor CPU NaN URL: https://github.com/apache/incubator-pulsar/issues/382#issuecomment-423003761 https://github.com/apache/incubator-pulsar/pull/2618 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se opened a new pull request #2618: Guard against NaN for double type metrics
aahmed-se opened a new pull request #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618 We check for Nan values and convert them to 0.0 for metric aggregations This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai commented on issue #2615: Azure offloader
jiazhai commented on issue #2615: Azure offloader URL: https://github.com/apache/incubator-pulsar/pull/2615#issuecomment-423005867 @david-streamlio , Thanks for the work. There was my typo for "jcloud", of Since this PR brings in some refine work, Would you please also help change "jcloud" to "jclouds" in the file path and code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2616: Fixed the null checking of retryDetails field
srkukarni closed pull request #2616: Fixed the null checking of retryDetails field URL: https://github.com/apache/incubator-pulsar/pull/2616 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 1e07516f59..32e878db05 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -548,7 +548,7 @@ public void setupInput(ContextImpl contextImpl) throws Exception { pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); } -if (this.instanceConfig.getFunctionDetails().getRetryDetails() != null) { +if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) { pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()); pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic()); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index afac7821cd..0e840a25f5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -90,7 +90,7 @@ public void open(Map config, SourceContext sourceContext) throws cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); } -if (pulsarSourceConfig.getMaxMessageRetries() >= 0) { +if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) { DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder(); deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries()); if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java index 4e2afa7c24..65c5847265 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java @@ -37,7 +37,7 @@ private FunctionConfig.ProcessingGuarantees processingGuarantees; SubscriptionType subscriptionType; private String subscriptionName; -private int maxMessageRetries; +private Integer maxMessageRetries = -1; private String deadLetterTopic; private Map topicSchema = new TreeMap<>(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai edited a comment on issue #2615: Azure offloader
jiazhai edited a comment on issue #2615: Azure offloader URL: https://github.com/apache/incubator-pulsar/pull/2615#issuecomment-423005867 @david-streamlio , Thanks for the work. There may be my typo for "jcloud", of Since this PR brings in some refine work, If possible, Would you please also help change "jcloud" to "jclouds" in the file path and code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam opened a new pull request #2619: Fix aggregation process of topic stats
massakam opened a new pull request #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619 When `exposeTopicLevelMetricsInPrometheus` is false, all values of `pulsar_msg_backlog` included in the metrics of prometheus are always 0. This is because there is a mistake in aggregation process of each topic stats. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2610: Prepare website for future releases without '-incubating' suffix
merlimat closed pull request #2610: Prepare website for future releases without '-incubating' suffix URL: https://github.com/apache/incubator-pulsar/pull/2610 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site2/website/pages/en/download.js b/site2/website/pages/en/download.js index cb87cbd047..cea6f26e00 100644 --- a/site2/website/pages/en/download.js +++ b/site2/website/pages/en/download.js @@ -28,16 +28,13 @@ function archiveUrl(version, type) { class Download extends React.Component { render() { -const latestRelease = releases[0]; - -const latestVersion = `${latestRelease}-incubating` +const latestVersion = releases[0]; const latestArchiveMirrorUrl = getLatestArchiveMirrorUrl(latestVersion, 'bin'); const latestSrcArchiveMirrorUrl = getLatestArchiveMirrorUrl(latestVersion, 'src'); const latestArchiveUrl = distUrl(latestVersion, 'bin'); const latestSrcArchiveUrl = distUrl(latestVersion, 'src') -const releaseInfo = releases.map(r => { - const version = `${r}-incubating`; +const releaseInfo = releases.map(version => { return { version: version, binArchiveUrl: archiveUrl(version, 'bin'), diff --git a/site2/website/releases.json b/site2/website/releases.json index 2966e8817f..9e4e3284c2 100644 --- a/site2/website/releases.json +++ b/site2/website/releases.json @@ -1,10 +1,10 @@ [ - "2.1.1", - "2.1.0", - "2.0.1", - "1.22.1", - "1.22.0", - "1.21.0", - "1.20.0", - "1.19.0" + "2.1.1-incubating", + "2.1.0-incubating", + "2.0.1-incubating", + "1.22.1-incubating", + "1.22.0-incubating", + "1.21.0-incubating", + "1.20.0-incubating", + "1.19.0-incubating" ] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423035680 @jerrypeng Have removed the string convertor. Also I have removed dependence on virtualenv and made the patch a lot simpler This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2593: Add support for running python functions with wheel file
merlimat commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219030874 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -229,6 +229,10 @@ void processArguments() throws Exception { description = "Path to the main Python file for the function (if the function is written in Python)", listConverter = StringConverter.class) protected String pyFile; +@Parameter( +names = "--pywheel", Review comment: Instead of having a new switch, could we reuse the `--py` and then disambiguate based on the file extension? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2620: Removed mentions to incubator-pulsar
merlimat opened a new pull request #2620: Removed mentions to incubator-pulsar URL: https://github.com/apache/incubator-pulsar/pull/2620 ### Motivation Removed mentions to incubator-pulsar This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis
jerrypeng commented on issue #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-423039949 rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis
jerrypeng commented on issue #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-423039923 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423042653 @merlimat I have removed the new runtime. Please take a look again. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2596: (WIP) [tests] improve, stabilize or disable unit tests
sijie commented on issue #2596: (WIP) [tests] improve, stabilize or disable unit tests URL: https://github.com/apache/incubator-pulsar/pull/2596#issuecomment-423044790 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219038542 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java ## @@ -34,6 +38,7 @@ public CmdSchemas(PulsarAdmin admin) { jcommander.addCommand("get", new GetSchema()); jcommander.addCommand("delete", new DeleteSchema()); jcommander.addCommand("upload", new UploadSchema()); +jcommander.addCommand("pojo", new PojoSchema()); Review comment: `pojo` is not an "action" here. why can't we add the options to `upload`, which I think it is the better place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219040732 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/TestSchema.java ## @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.schema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; + +/** + * Test schema operations + */ +@Slf4j +public class TestSchema extends PulsarClusterTestBase { + +@BeforeSuite +@Override +public void setupCluster() throws Exception { +final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) +.filter(s -> s != null && !s.isEmpty()) +.collect(joining("-")); + +PulsarClusterSpec spec = PulsarClusterSpec.builder() +.numBookies(2) +.numBrokers(1) +.clusterName(clusterName) +.build(); + +log.info("Setting up cluster {} with {} bookies, {} brokers", +spec.clusterName(), spec.numBookies(), spec.numBrokers()); + +pulsarCluster = PulsarCluster.forSpec(spec); +pulsarCluster.start(); + +log.info("Cluster {} is setup", spec.clusterName()); +} + +@AfterSuite +@Override +public void tearDownCluster() { +super.tearDownCluster(); +} + +@Test(dataProvider = "ServiceAndAdminUrls") +public void testJarPojoSchemaUploadAvro(String serviceUrl, String adminUrl) throws Exception { + +ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker( +"schemas", +"pojo", "--jar", "/pulsar/examples/api-examples.jar", "--type", "avro", +"--class-name", "org.apache.pulsar.functions.api.examples.pojo.Tick", +"persistent://public/default/pojo-avro"); + +Assert.assertEquals(containerExecResult.getExitCode(), 0); Review comment: when you upload the schema, you need to verify you can create producer and consumer with the schema and produce/consume messages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219040391 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java ## @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.admin.cli.utils; + +import org.apache.pulsar.client.api.Schema; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; + +public class SchemaExtractor { + +public static String getJsonSchemaInfo(Class clazz) throws UnsupportedEncodingException { + +return new String(Schema.JSON(clazz).getSchemaInfo().toString().getBytes(), StandardCharsets.UTF_8.toString()); Review comment: 1. `getSchemaInfo().toString().getBytes()` => `getSchemaInfo().getSchema()` 2. you don't need `toString` on `UTF_8`. it can just be `new String(schema, UTF_8)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219040657 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/TestSchema.java ## @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.schema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; + +/** + * Test schema operations + */ +@Slf4j +public class TestSchema extends PulsarClusterTestBase { Review comment: don't try to create a new test case, add the tests to `CLITest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219038438 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java ## @@ -83,4 +88,44 @@ void run() throws Exception { } } +@Parameters(commandDescription = "Provide the schema via a topic") +private class PojoSchema extends CliCommand { +@Parameter(description = "persistent://tenant/namespace/topic", required = true) +private java.util.List params; + +@Parameter(names = { "-j", "--jar" }, description = "jar filepath", required = true) +private String jarFilePath; + +@Parameter(names = { "-t", "--type" }, description = "type avro or json", required = true) +private String type; + +@Parameter(names = { "-c", "--class-name" }, description = "class name of pojo", required = true) Review comment: `--class-name` to `--classname`. `--classname` is more common than `--class-name`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2578: Add support for schema extraction from a jar
sijie commented on a change in pull request #2578: Add support for schema extraction from a jar URL: https://github.com/apache/incubator-pulsar/pull/2578#discussion_r219040489 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java ## @@ -232,4 +232,25 @@ public void testGrantPermissionsAuthorizationDisabled() throws Exception { } } +@Test +public void cliJarSchemaUploadTest() throws Exception { Review comment: what are you trying to test in this method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2590: Issue #2584: unacked message is not redelivered on time
sijie commented on issue #2590: Issue #2584: unacked message is not redelivered on time URL: https://github.com/apache/incubator-pulsar/pull/2590#issuecomment-423048311 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2611: Make dockerUtils use container name for test exec
sijie commented on issue #2611: Make dockerUtils use container name for test exec URL: https://github.com/apache/incubator-pulsar/pull/2611#issuecomment-423048586 > the log configure does not, the DockerUtils changes fixes that issue. can you please comment inline and address the comment individually? because my comments are for different places. especially for the log configuration file, I don't see it is really needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis
jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r219041396 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,153 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); Review comment: yup we can just do that. This logic is from before This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis
jerrypeng commented on a change in pull request #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#discussion_r219041396 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java ## @@ -60,140 +62,153 @@ private final String fqfn; private interface PulsarSinkProcessor { -void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception; TypedMessageBuilder newMessage(Record record) throws Exception; void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception; -abstract void close() throws Exception; +void close() throws Exception; } -private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { -private Producer producer; +private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { +protected Map> publishProducers = new ConcurrentHashMap<>(); +protected Schema schema; -@Override -public void initializeOutputProducer(String outputTopic, Schema schema, String fqfn) throws Exception { -this.producer = AbstractOneOuputTopicProducers.createProducer( -client, pulsarSinkConfig.getTopic(), null, schema, fqfn); +protected PulsarSinkProcessorBase(Schema schema) { +this.schema = schema; } -@Override -public TypedMessageBuilder newMessage(Record record) { -return producer.newMessage(); +public Producer createProducer(PulsarClient client, String topic, String producerName, Schema schema, String fqfn) +throws PulsarClientException { +ProducerBuilder builder = client.newProducer(schema) +.blockIfQueueFull(true) +.enableBatching(true) +.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) +.compressionType(CompressionType.LZ4) +.hashingScheme(HashingScheme.Murmur3_32Hash) // +.messageRoutingMode(MessageRoutingMode.CustomPartition) +.messageRouter(FunctionResultRouter.of()) +.topic(topic); +if (producerName != null) { +builder.producerName(producerName); +} + +return builder +.property("application", "pulsarfunction") +.property("fqfn", fqfn).create(); } -@Override -public void sendOutputMessage(TypedMessageBuilder msg, Record record) throws Exception { -msg.sendAsync(); +protected Producer getProducer(String destinationTopic) { +return getProducer(destinationTopic, null, destinationTopic); } -@Override -public void close() throws Exception { -if (null != producer) { +protected Producer getProducer(String producerId, String producerName, String topicName) { + +Producer producer = publishProducers.get(producerId); + +if (producer == null) { try { -producer.close(); +Producer newProducer = createProducer( +client, +topicName, +producerName, +schema, +fqfn); + +Producer existingProducer = publishProducers.putIfAbsent(producerId, newProducer); Review comment: yup we can just do that. This logic is from behore This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2618: Guard against NaN for double type metrics
sijie commented on a change in pull request #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#discussion_r219041485 ## File path: tests/docker-images/latest-version-image/ssl/ca.cert.pem ## @@ -1,29 +0,0 @@ --BEGIN CERTIFICATE- Review comment: why change this file? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2618: Guard against NaN for double type metrics
sijie commented on a change in pull request #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#discussion_r219041640 ## File path: pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java ## @@ -285,31 +285,47 @@ public double getAllocatedCPU() { } public void setAllocatedCPU(double allocatedCPU) { -this.allocatedCPU = allocatedCPU; +if (Double.isFinite(allocatedCPU)) { Review comment: make a util function for that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2618: Guard against NaN for double type metrics
sijie commented on a change in pull request #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#discussion_r219042150 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java ## @@ -655,12 +655,12 @@ public void writeResourceQuotasToZooKeeper() throws Exception { // write the load factors setDynamicConfigurationToZK(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, new HashMap() { { -put(SETTING_NAME_LOAD_FACTOR_CPU, Double.toString(realtimeCpuLoadFactor)); +put(SETTING_NAME_LOAD_FACTOR_CPU, Double.isFinite(realtimeCpuLoadFactor) ? Double.toString(realtimeCpuLoadFactor) : Double.toString(0.0)); Review comment: better to have a util function ``` double getFiniteValue(double value, double defaultValue) { return Double.isFinite(value) ? value : defaultValue; } ``` so you can call `Double.toString(getFiniteValue(realtimeCpuLoadFactor, 0.0))`; and this util function can be reused This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2617: Add sample docker-compose file with pulsar standalone
sijie commented on issue #2617: Add sample docker-compose file with pulsar standalone URL: https://github.com/apache/incubator-pulsar/pull/2617#issuecomment-423049972 please fix license header This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2614: Debezium: add PulsarDatabaseHistory for debezium
sijie commented on a change in pull request #2614: Debezium: add PulsarDatabaseHistory for debezium URL: https://github.com/apache/incubator-pulsar/pull/2614#discussion_r219042567 ## File path: pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java ## @@ -0,0 +1,243 @@ +/* + * Copyright Debezium Authors. Review comment: this license header might need to be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2614: Debezium: add PulsarDatabaseHistory for debezium
sijie commented on a change in pull request #2614: Debezium: add PulsarDatabaseHistory for debezium URL: https://github.com/apache/incubator-pulsar/pull/2614#discussion_r219042655 ## File path: pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java ## @@ -0,0 +1,209 @@ +/* + * Copyright Debezium Authors. Review comment: the license header need to be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #2504: Add Presto Sql Test
sijie commented on a change in pull request #2504: Add Presto Sql Test URL: https://github.com/apache/incubator-pulsar/pull/2504#discussion_r219043885 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java ## @@ -0,0 +1,66 @@ +package org.apache.pulsar.tests.integration.presto; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; +import static org.assertj.core.api.Assertions.assertThat; + +@Slf4j +public class TestBasicPresto extends PulsarClusterTestBase { + +@BeforeSuite +@Override +public void setupCluster() throws Exception { +final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) +.filter(s -> s != null && !s.isEmpty()) +.collect(joining("-")); + +PulsarClusterSpec spec = PulsarClusterSpec.builder() +.numBookies(2) +.numBrokers(1) +.enablePrestoWorker(true) +.clusterName(clusterName) +.build(); + +log.info("Setting up cluster {} with {} bookies, {} brokers", +spec.clusterName(), spec.numBookies(), spec.numBrokers()); + +pulsarCluster = PulsarCluster.forSpec(spec); +pulsarCluster.start(); + +log.info("Cluster {} is setup with presto worker", spec.clusterName()); +} + +@Test +public void testDefaultCatalog() throws Exception { +ContainerExecResult containerExecResult = execQuery("show catalogs;"); +assertThat(containerExecResult.getExitCode()).isEqualTo(0); +assertThat(containerExecResult.getStdout()).contains("pulsar", "system"); +} + +@AfterSuite +@Override +public void tearDownCluster() { +super.tearDownCluster(); +} + +public static ContainerExecResult execQuery(final String query) throws Exception { +ContainerExecResult containerExecResult; + +containerExecResult = pulsarCluster.getPrestoWorkerContainer() Review comment: can you test actual SQL query rather than just show catelog, which doesn't actually run any sql execution. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2619: Fix aggregation process of topic stats
massakam commented on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423052925 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis
jerrypeng commented on issue #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-423052921 rerun integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available
jerrypeng commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available URL: https://github.com/apache/incubator-pulsar/pull/2613#discussion_r219048080 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java ## @@ -52,6 +52,10 @@ List newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); +if (heartBeatWorkerId != null && workerIdToAssignment.get(heartBeatWorkerId) == null) { Review comment: I would suggest moving the logic of the heartbeat function outside of the abstraction of the scheduler. The scheduler is just suppose to assign instances to workers based on some strategy while the heartbeat function scheduling strategy will always be the same. In the future, if we don't move this logic out and we want to add alternative scheduling strategies, every scheduling strategy will need this logic. I would suggest moving this logic into the SchedulerManager and just assign it appropriately there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available
rdhabalia commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available URL: https://github.com/apache/incubator-pulsar/pull/2613#discussion_r219060538 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java ## @@ -52,6 +52,10 @@ List newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); +if (heartBeatWorkerId != null && workerIdToAssignment.get(heartBeatWorkerId) == null) { Review comment: fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2590: Issue #2584: unacked message is not redelivered on time
sijie closed pull request #2590: Issue #2584: unacked message is not redelivered on time URL: https://github.com/apache/incubator-pulsar/pull/2590 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 2c94966aa3..dfeb4afcc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -217,7 +217,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, Set messageSet = Sets.newHashSet(); for (int i = 0; i < numMessages; i++) { future_msg = consumer.receiveAsync(); -Thread.sleep(10); msg = future_msg.get(); String receivedMessage = new String(msg.getData()); log.info("Received message: [{}]", receivedMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 61bdad034a..df383b532f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -2662,4 +2663,50 @@ public void received(Consumer consumer, Message message) assertEquals(latch.getCount(), 1); consumer.close(); } + +/** + * Ack timeout message is redelivered on time. + * Related github issue #2584 + */ +@Test +public void testAckTimeoutRedeliver() throws Exception { +log.info("-- Starting {} test --", methodName); + +// create consumer and producer +ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer() +.topic("persistent://my-property/my-ns/ack-timeout-topic") +.subscriptionName("subscriber-1") +.ackTimeout(1, TimeUnit.SECONDS) +.subscriptionType(SubscriptionType.Shared) +.acknowledgmentGroupTime(0, TimeUnit.SECONDS) +.subscribe(); + +Producer producer = pulsarClient.newProducer() +.topic("persistent://my-property/my-ns/ack-timeout-topic") +.enableBatching(false) +.messageRoutingMode(MessageRoutingMode.SinglePartition) +.create(); + +// (1) Produced one Message +String content = "my-message-will-ack-timeout"; +producer.send(content.getBytes()); + +// (2) consumer to receive messages, and not ack +Message message = consumer.receive(); + +// (3) should be re-delivered once ack-timeout. +Thread.sleep(1000); +message = consumer.receive(200, TimeUnit.MILLISECONDS); +assertNotNull(message); + +Thread.sleep(1000); +message = consumer.receive(200, TimeUnit.MILLISECONDS); +assertNotNull(message); + +assertEquals(content, new String(message.getData())); + +producer.close(); +consumer.close(); +log.info("-- Exiting {} test --", methodName); +} } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index da53760d2b..e178febdcb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -235,16 +235,17 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { private static int receiveAllMessage(Consumer consumer, boolean ackMessages) throws Exception { int messagesReceived = 0; -Message msg = consumer.receive(1, TimeUnit.SECONDS); +Message msg = consumer.receive(200, TimeUnit.MILLISECONDS); while (msg != null) { ++messagesReceived; -log.info("Consumer received {}", new String(msg.getData())); +log.info("Consumer {} received {}", consumer.getConsumerName(), new String(msg.getData())); if (ackMessages) { consumer.acknowledge(msg)
[GitHub] sijie commented on issue #2584: unacked message is not redelivered in setting ackTimeout
sijie commented on issue #2584: unacked message is not redelivered in setting ackTimeout URL: https://github.com/apache/incubator-pulsar/issues/2584#issuecomment-423076273 This is fixed by #2590 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed issue #2584: unacked message is not redelivered in setting ackTimeout
sijie closed issue #2584: unacked message is not redelivered in setting ackTimeout URL: https://github.com/apache/incubator-pulsar/issues/2584 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia opened a new pull request #2621: Upgrade kinesis version to fix extract binary in memory
rdhabalia opened a new pull request #2621: Upgrade kinesis version to fix extract binary in memory URL: https://github.com/apache/incubator-pulsar/pull/2621 ### Motivation Kinesis old version had a major bug to copy native-files into main memory which can cause outofmemory if multiple concurrent kinesis-producers are created at same time. and it has been fixed in `0.12.9`. aws-kinesis: https://github.com/awslabs/amazon-kinesis-producer/pull/198 Upgrade kinesis version: `0.12.9` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #2621: Upgrade kinesis version to fix extract binary in memory
rdhabalia commented on issue #2621: Upgrade kinesis version to fix extract binary in memory URL: https://github.com/apache/incubator-pulsar/pull/2621#issuecomment-423081539 rerun cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #2593: Add support for running python functions with wheel file
ivankelly commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219082009 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: where is this unzipped to? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #2593: Add support for running python functions with wheel file
ivankelly commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219081797 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: unzip is a new dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2619: Fix aggregation process of topic stats
massakam commented on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423102825 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codelipenghui opened a new pull request #2622: Add Maven Source Plugin.
codelipenghui opened a new pull request #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622 ### Motivation Allow maven or gradle to download pulsar client source. ### Modifications Add maven source plugin in pulsar.pom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2620: Removed mentions to incubator-pulsar
massakam commented on issue #2620: Removed mentions to incubator-pulsar URL: https://github.com/apache/incubator-pulsar/pull/2620#issuecomment-423132805 It seems that this part should be removed. https://github.com/apache/incubator-pulsar/blob/152e5a3ed6e9b9c4c2446a9d893ba8c863d9700c/distribution/offloaders/src/assemble/offloaders.xml#L31-L35 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file
srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219158166 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: it is unzipped into the same tmp directory the .whl file is copied to. The -d option controls that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file
srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219158728 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: unzip is indeed a new dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423179516 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #2612: Allow byte[] keys for messages (#1016)
ivankelly commented on a change in pull request #2612: Allow byte[] keys for messages (#1016) URL: https://github.com/apache/incubator-pulsar/pull/2612#discussion_r219185483 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java ## @@ -89,6 +89,14 @@ */ TypedMessageBuilder key(String key); +/** + * Sets the bytes of the key of the message for routing policy. + * Internally the bytes will be base64 encoded. + * + * @param key routing key for message, in byte array form + */ +TypedMessageBuilder keyBytes(byte[] key); Review comment: symmetry with the getKeyBytes() call on Message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2622: Add Maven Source Plugin.
merlimat commented on issue #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622#issuecomment-423242331 @codelipenghui I think the sources should be already attached and published on maven repo: http://central.maven.org/maven2/org/apache/pulsar/pulsar-client/2.1.1-incubating/ Is it failing to find them? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #2593: Add support for running python functions with wheel file
merlimat commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219228545 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: `unzip` might not be always installed (especially in Docker images where everything is stripped to bone). Python has a native way to deal with zip files, without invoking the CLI command: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile.extract This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2620: Removed mentions to incubator-pulsar
merlimat commented on issue #2620: Removed mentions to incubator-pulsar URL: https://github.com/apache/incubator-pulsar/pull/2620#issuecomment-423247655 Good catch @massakam. Removed the rerefence to DISCLAIMER This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file
srkukarni commented on a change in pull request #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#discussion_r219239279 ## File path: pulsar-functions/instance/src/main/python/python_instance_main.py ## @@ -74,6 +73,11 @@ def main(): args = parser.parse_args() function_details = Function_pb2.FunctionDetails() json_format.Parse(args.function_details, function_details) + + if os.path.splitext(str(args.py))[1] == '.whl': +os.system("unzip -d %s -o %s" % (os.path.dirname(str(args.py)), str(args.py))) Review comment: I have removed the unzip dependency by switching over to zipfile. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2622: Add Maven Source Plugin.
merlimat commented on issue #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622#issuecomment-423258529 I think I see the problem: `pulsar-client` is the shaded module. It includes `pulsar-client-original` and its dependencies. For `pulsar-client-original` the sources and javadoc jars appear to be correct (http://central.maven.org/maven2/org/apache/pulsar/pulsar-client-original/2.1.1-incubating/), but for `pulsar-client` the sources jar is empty, because the module itself has no Java sources. I don't think this PR will solve the problem either. We need a way to attach the orginal sources (and javadoc) to the shaded artifact. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2622: Add Maven Source Plugin.
sijie commented on issue #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622#issuecomment-423262722 @merlimat I think the sources jar is missing at snapshot https://repository.apache.org/content/repositories/snapshots/org/apache/pulsar/pulsar-client-original/2.2.0-incubating-SNAPSHOT/ if we can configure the snapshot job to publish source jar, that's probably enough for @codelipenghui This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2622: Add Maven Source Plugin.
merlimat commented on issue #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622#issuecomment-423265630 I still worry that the source jar for `pulsar-client` will be empty, given that module is just for shading purpose and has no sources. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic
sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic URL: https://github.com/apache/incubator-pulsar/pull/2585#issuecomment-423268786 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2596: [tests] improve PersistentFailoverE2ETest and add more logging to RawReaderTest
sijie closed pull request #2596: [tests] improve PersistentFailoverE2ETest and add more logging to RawReaderTest URL: https://github.com/apache/incubator-pulsar/pull/2596 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 3e293afc61..9cf55dca5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -100,14 +100,14 @@ private void verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListe } private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception { -Integer pid = listener.activeQueue.poll(10, TimeUnit.SECONDS); +Integer pid = listener.activeQueue.take(); assertNotNull(pid); assertEquals(partitionId, pid.intValue()); assertNull(listener.inActiveQueue.poll()); } private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception { -Integer pid = listener.inActiveQueue.poll(10, TimeUnit.SECONDS); +Integer pid = listener.inActiveQueue.take(); assertNotNull(pid); assertEquals(partitionId, pid.intValue()); assertNull(listener.activeQueue.poll()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 699e9baae6..9d353bdaee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -221,7 +221,10 @@ public void testFlowControl() throws Exception { for (Future f : futures) { try (RawMessage m = f.get(1, TimeUnit.SECONDS)) { // Assert each key is unique -Assert.assertTrue(keys.add(extractKey(m))); +String key = extractKey(m); +Assert.assertTrue( +keys.add(key), +"Received duplicated key '" + key + "' : already received keys = " + keys); } catch (TimeoutException te) { timeouts++; } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2623: [tests] run docker system prune to clean up dangling network/processes
sijie opened a new pull request #2623: [tests] run docker system prune to clean up dangling network/processes URL: https://github.com/apache/incubator-pulsar/pull/2623 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available
jerrypeng commented on a change in pull request #2613: Avoid scheduling heartbeat function if owner-worker not available URL: https://github.com/apache/incubator-pulsar/pull/2613#discussion_r219256717 ## File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java ## @@ -52,6 +52,10 @@ List newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); +if (heartBeatWorkerId != null && workerIdToAssignment.get(heartBeatWorkerId) == null) { Review comment: I would suggest moving all the logic concerning heartbeat function out of the scheduler. You can just make create an assignment for the heartbeat function in the scheduler manager. Ideally we don't want any of this with the heartbeat function within the scheduler This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2622: Add Maven Source Plugin.
sijie commented on issue #2622: Add Maven Source Plugin. URL: https://github.com/apache/incubator-pulsar/pull/2622#issuecomment-423272540 > I still worry that the source jar for pulsar-client will be empty, given that module is just for shading purpose and has no sources. yeah, I agreed. we should fix that as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new pull request #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema
sijie opened a new pull request #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema URL: https://github.com/apache/incubator-pulsar/pull/2624 *Motivation* ByteBuffer is a variant of `bytes`. so it should be `SchemaType.BYTES`, not `SchemaType.BYTEBUFFER` *Changes* - Fix bytebuffer schema type - Add bytebuf schema This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema
sijie commented on issue #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema URL: https://github.com/apache/incubator-pulsar/pull/2624#issuecomment-423278674 This should be part of 2.2 change, since we should not even store `SchemaType.BYTEBUFFER`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2618: Guard against NaN for double type metrics
aahmed-se commented on issue #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#issuecomment-423295125 added utility method This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #2605: implement topic routing on a per record basis
jerrypeng commented on issue #2605: implement topic routing on a per record basis URL: https://github.com/apache/incubator-pulsar/pull/2605#issuecomment-423296494 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms
merlimat opened a new pull request #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms URL: https://github.com/apache/incubator-pulsar/pull/2625 ### Motivation Next release will not have `-incubating` suffix This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on issue #2617: Add sample docker-compose file with pulsar standalone
aahmed-se commented on issue #2617: Add sample docker-compose file with pulsar standalone URL: https://github.com/apache/incubator-pulsar/pull/2617#issuecomment-423298085 Add license and packaging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms
merlimat commented on issue #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms URL: https://github.com/apache/incubator-pulsar/pull/2625#issuecomment-423306156 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2618: Guard against NaN for double type metrics
merlimat commented on issue #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#issuecomment-423340233 run cpp tests run java8 tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2618: Guard against NaN for double type metrics
merlimat commented on issue #2618: Guard against NaN for double type metrics URL: https://github.com/apache/incubator-pulsar/pull/2618#issuecomment-423340070 rerun tests please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2623: [tests] run docker system prune to clean up dangling network/processes
sijie commented on issue #2623: [tests] run docker system prune to clean up dangling network/processes URL: https://github.com/apache/incubator-pulsar/pull/2623#issuecomment-423340463 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms
merlimat commented on issue #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms URL: https://github.com/apache/incubator-pulsar/pull/2625#issuecomment-423341209 run java8 tests run integration tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2620: Removed mentions to incubator-pulsar
merlimat commented on issue #2620: Removed mentions to incubator-pulsar URL: https://github.com/apache/incubator-pulsar/pull/2620#issuecomment-423344810 run cpp tests run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se commented on a change in pull request #2504: Add Presto Sql Test
aahmed-se commented on a change in pull request #2504: Add Presto Sql Test URL: https://github.com/apache/incubator-pulsar/pull/2504#discussion_r219331833 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java ## @@ -0,0 +1,66 @@ +package org.apache.pulsar.tests.integration.presto; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; +import static org.assertj.core.api.Assertions.assertThat; + +@Slf4j +public class TestBasicPresto extends PulsarClusterTestBase { + +@BeforeSuite +@Override +public void setupCluster() throws Exception { +final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) +.filter(s -> s != null && !s.isEmpty()) +.collect(joining("-")); + +PulsarClusterSpec spec = PulsarClusterSpec.builder() +.numBookies(2) +.numBrokers(1) +.enablePrestoWorker(true) +.clusterName(clusterName) +.build(); + +log.info("Setting up cluster {} with {} bookies, {} brokers", +spec.clusterName(), spec.numBookies(), spec.numBrokers()); + +pulsarCluster = PulsarCluster.forSpec(spec); +pulsarCluster.start(); + +log.info("Cluster {} is setup with presto worker", spec.clusterName()); +} + +@Test +public void testDefaultCatalog() throws Exception { +ContainerExecResult containerExecResult = execQuery("show catalogs;"); +assertThat(containerExecResult.getExitCode()).isEqualTo(0); +assertThat(containerExecResult.getStdout()).contains("pulsar", "system"); +} + +@AfterSuite +@Override +public void tearDownCluster() { +super.tearDownCluster(); +} + +public static ContainerExecResult execQuery(final String query) throws Exception { +ContainerExecResult containerExecResult; + +containerExecResult = pulsarCluster.getPrestoWorkerContainer() Review comment: I have added a sql test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms
merlimat closed pull request #2625: Renamed 2.2.0-incubating-SNAPSHOT into 2.2.0-SNAPSHOT in poms URL: https://github.com/apache/incubator-pulsar/pull/2625 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/buildtools/pom.xml b/buildtools/pom.xml index b81f1b02b6..8dfdf9da93 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -30,7 +30,7 @@ org.apache.pulsar buildtools - 2.2.0-incubating-SNAPSHOT + 2.2.0-SNAPSHOT jar Pulsar Build Tools diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml index dcb509596f..9d801b7489 100644 --- a/distribution/io/pom.xml +++ b/distribution/io/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar distribution -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/distribution/offloaders/pom.xml b/distribution/offloaders/pom.xml index 84349da0db..8ff06b9c85 100644 --- a/distribution/offloaders/pom.xml +++ b/distribution/offloaders/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar distribution -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/distribution/pom.xml b/distribution/pom.xml index 80134e210b..ed32b97a45 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index ca4e82134c..6a16d62084 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar distribution -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/docker/grafana/pom.xml b/docker/grafana/pom.xml index 405986a014..a3c55da160 100644 --- a/docker/grafana/pom.xml +++ b/docker/grafana/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/docker/pom.xml b/docker/pom.xml index 302bda80cc..47c837f29e 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT org.apache.pulsar docker-images diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml index 519a4980ff..d86e8a52f4 100644 --- a/docker/pulsar-all/pom.xml +++ b/docker/pulsar-all/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/docker/pulsar-standalone/pom.xml b/docker/pulsar-standalone/pom.xml index 716c14163f..15de12282e 100644 --- a/docker/pulsar-standalone/pom.xml +++ b/docker/pulsar-standalone/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml index d7a94c6fc8..ee38855d12 100644 --- a/docker/pulsar/pom.xml +++ b/docker/pulsar/pom.xml @@ -23,7 +23,7 @@ org.apache.pulsar docker-images -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT 4.0.0 org.apache.pulsar diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml index f7ed5d0e05..17908bc8da 100644 --- a/examples/flink-consumer-source/pom.xml +++ b/examples/flink-consumer-source/pom.xml @@ -24,7 +24,7 @@ org.apache.pulsar.examples pulsar-examples -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT org.apache.pulsar.examples diff --git a/examples/pom.xml b/examples/pom.xml index 753d0adad0..dddf444d35 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT org.apache.pulsar.examples diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml index e79c448443..6b12a60f90 100644 --- a/jclouds-shaded/pom.xml +++ b/jclouds-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/managed-ledger-shaded/pom.xml b/managed-ledger-shaded/pom.xml index 3ec3b46d55..468fe3a058 100644 --- a/managed-ledger-shaded/pom.xml +++ b/managed-ledger-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index b1fe3fa179..4b1c04d3f0 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -2.2.0-incubating-SNAPSHOT +2.2.0-SNAPSHOT .. diff --git a/pom.xml b/pom.xml index 9
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423366855 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema
sijie closed pull request #2624: [schema] Add ByteBuf schema and fix ByteBuffer schema URL: https://github.com/apache/incubator-pulsar/pull/2624 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java new file mode 100644 index 00..4e7e6d0d25 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * A variant `Bytes` schema that takes {@link io.netty.buffer.ByteBuf}. + */ +public class ByteBufSchema implements Schema { + +public static ByteBufSchema of() { +return INSTANCE; +} + +private static final ByteBufSchema INSTANCE = new ByteBufSchema(); +private static final SchemaInfo SCHEMA_INFO = new SchemaInfo() +.setName("ByteBuf") +.setType(SchemaType.BYTES) +.setSchema(new byte[0]); + +@Override +public byte[] encode(ByteBuf message) { +if (message == null) { +return null; +} + +return ByteBufUtil.getBytes(message); +} + +@Override +public ByteBuf decode(byte[] bytes) { +if (null == bytes) { +return null; +} else { +return Unpooled.wrappedBuffer(bytes); +} +} + +@Override +public SchemaInfo getSchemaInfo() { +return SCHEMA_INFO; +} +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java index ee8ba66fa6..251cd93dcb 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java @@ -24,7 +24,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** - * A bytebuffer schema. + * A bytebuffer schema is effectively a `BYTES` schema. */ public class ByteBufferSchema implements Schema { @@ -35,7 +35,7 @@ public static ByteBufferSchema of() { private static final ByteBufferSchema INSTANCE = new ByteBufferSchema(); private static final SchemaInfo SCHEMA_INFO = new SchemaInfo() .setName("ByteBuffer") -.setType(SchemaType.BYTEBUFFER) +.setType(SchemaType.BYTES) .setSchema(new byte[0]); @Override diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java index fdac5395e2..aac2d8c5c1 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java @@ -18,9 +18,11 @@ */ package org.apache.pulsar.client.schema; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; @@ -28,6 +30,7 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.ByteBufSchema; import org.apache.pulsar.client.impl.schema.ByteBufferSchema; import org.apache.pulsar.client.impl.schema.ByteSchema; import org.apache.pulsar.client.impl.
[GitHub] sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic
sijie commented on issue #2585: [tests] Make BrokerClientIntegrationTest testing behavior deterministic URL: https://github.com/apache/incubator-pulsar/pull/2585#issuecomment-423375719 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam removed a comment on issue #2619: Fix aggregation process of topic stats
massakam removed a comment on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423102825 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2619: Fix aggregation process of topic stats
massakam commented on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423388237 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aahmed-se edited a comment on issue #2617: Add sample docker-compose file with pulsar standalone
aahmed-se edited a comment on issue #2617: Add sample docker-compose file with pulsar standalone URL: https://github.com/apache/incubator-pulsar/pull/2617#issuecomment-423298085 Added license and packaging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423389115 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #2626: Make use of workerconfig defined health check interval
srkukarni opened a new pull request #2626: Make use of workerconfig defined health check interval URL: https://github.com/apache/incubator-pulsar/pull/2626 ### Motivation Explain here the context, and why you're making that change. What is the problem you're trying to solve. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2626: Make use of workerconfig defined health check interval
srkukarni commented on issue #2626: Make use of workerconfig defined health check interval URL: https://github.com/apache/incubator-pulsar/pull/2626#issuecomment-423392400 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2626: Make use of workerconfig defined health check interval
srkukarni commented on issue #2626: Make use of workerconfig defined health check interval URL: https://github.com/apache/incubator-pulsar/pull/2626#issuecomment-423394935 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2626: Make use of workerconfig defined health check interval
srkukarni commented on issue #2626: Make use of workerconfig defined health check interval URL: https://github.com/apache/incubator-pulsar/pull/2626#issuecomment-423401206 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #2619: Fix aggregation process of topic stats
massakam commented on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423409438 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam removed a comment on issue #2619: Fix aggregation process of topic stats
massakam removed a comment on issue #2619: Fix aggregation process of topic stats URL: https://github.com/apache/incubator-pulsar/pull/2619#issuecomment-423388237 rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423414082 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #2626: Make use of workerconfig defined health check interval
srkukarni closed pull request #2626: Make use of workerconfig defined health check interval URL: https://github.com/apache/incubator-pulsar/pull/2626 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index ddd546e1f1..f8a14ffc1a 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -114,7 +114,7 @@ def update(self, object): class PythonInstance(object): - def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, user_code, pulsar_client): + def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code self.queue = Queue.Queue(max_buffered_tuples) @@ -138,6 +138,7 @@ def __init__(self, instance_id, function_id, function_version, function_details, self.stats = Stats() self.last_health_check_ts = time.time() self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None +self.expected_healthcheck_interval = expected_healthcheck_interval def health_check(self): self.last_health_check_ts = time.time() @@ -146,12 +147,12 @@ def health_check(self): return health_check_result def process_spawner_health_check_timer(self): -if time.time() - self.last_health_check_ts > 90: +if time.time() - self.last_health_check_ts > self.expected_healthcheck_interval * 3: Log.critical("Haven't received health check from spawner in a while. Stopping instance...") os.kill(os.getpid(), signal.SIGKILL) sys.exit(1) -Timer(30, self.process_spawner_health_check_timer).start() +Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def run(self): # Setup consumers and input deserializers @@ -214,7 +215,8 @@ def run(self): # start proccess spawner health check timer self.last_health_check_ts = time.time() -Timer(30, self.process_spawner_health_check_timer).start() +if self.expected_healthcheck_interval > 0: + Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): Log.info("Started Thread for executing the function") diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index d9f1132bfb..2f5c8959fa 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -70,6 +70,7 @@ def main(): parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') parser.add_argument('--logging_directory', required=True, help='Logging Directory') parser.add_argument('--logging_file', required=True, help='Log file name') + parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) args = parser.parse_args() function_details = Function_pb2.FunctionDetails() @@ -97,7 +98,9 @@ def main(): pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 5, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection) pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id), str(args.function_version), function_details, - int(args.max_buffered_tuples), str(args.py), pulsar_client) + int(args.max_buffered_tuples), + int(args.expected_healthcheck_interval), + str(args.py), pulsar_client) pyinstance.run() server_instance = server.serve(args.port, pyinstance) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 083686b503..38a4c281e8 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -92,6 +92,9 @@ @P
[GitHub] sijie commented on issue #2614: Debezium: add PulsarDatabaseHistory for debezium
sijie commented on issue #2614: Debezium: add PulsarDatabaseHistory for debezium URL: https://github.com/apache/incubator-pulsar/pull/2614#issuecomment-423420561 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jiazhai opened a new pull request #2627: Debezium: kafka connect offset store
jiazhai opened a new pull request #2627: Debezium: kafka connect offset store URL: https://github.com/apache/incubator-pulsar/pull/2627 ### Motivation This is based on PR #2614, please check and review starting from the 3rd commit It mainly want to PulsarOffsetBackingStore. ### Modifications add PulsarOffsetBackingStore and test for it. ### Result ut pass This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #2593: Add support for running python functions with wheel file
srkukarni commented on issue #2593: Add support for running python functions with wheel file URL: https://github.com/apache/incubator-pulsar/pull/2593#issuecomment-423432518 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #2612: Allow byte[] keys for messages (#1016)
ivankelly commented on issue #2612: Allow byte[] keys for messages (#1016) URL: https://github.com/apache/incubator-pulsar/pull/2612#issuecomment-423448735 rerun integration tests rerun java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context
sijie commented on issue #2411: [functions] change instance id from string to int and expose number of instances in context URL: https://github.com/apache/incubator-pulsar/pull/2411#issuecomment-423450196 @srkukarni we need this for debezium integration. so I will go ahead with this change and leave python to a separate PR. is that okay? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #2623: [tests] run docker system prune to clean up dangling network/processes
sijie commented on issue #2623: [tests] run docker system prune to clean up dangling network/processes URL: https://github.com/apache/incubator-pulsar/pull/2623#issuecomment-423458313 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services