[GitHub] srkukarni opened a new pull request #1701: Fix Trigger functionality for non Java Functions
srkukarni opened a new pull request #1701: Fix Trigger functionality for non Java Functions URL: https://github.com/apache/incubator-pulsar/pull/1701 ### Motivation Non Java functions won't have PulsarSource as their source function. Instead we need to rely on the inputserdemap. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1701: Fix Trigger functionality for non Java Functions
srkukarni commented on issue #1701: Fix Trigger functionality for non Java Functions URL: https://github.com/apache/incubator-pulsar/pull/1701#issuecomment-385603515 @sijie @jerrypeng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Use Function class loader to resolve function types (#1699)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new fd576ae Use Function class loader to resolve function types (#1699) fd576ae is described below commit fd576aea6ec221e9a729b05a19550a33d1864d03 Author: Sanjeev KulkarniAuthorDate: Mon Apr 30 22:23:10 2018 -0700 Use Function class loader to resolve function types (#1699) --- .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index caaa7bf..9048544 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -128,7 +128,7 @@ public class PulsarSource implements Source { private void setupSerde() throws ClassNotFoundException { -Class typeArg = Class.forName(this.pulsarConfig.getTypeClassName()); +Class typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarConfig.getTypeClassName()); if (Void.class.equals(typeArg)) { throw new RuntimeException("Input type of Pulsar Function cannot be Void"); } -- To stop receiving notification emails like this one, please contact si...@apache.org.
[GitHub] sijie closed pull request #1699: Use Function class loader to resolve function types
sijie closed pull request #1699: Use Function class loader to resolve function types URL: https://github.com/apache/incubator-pulsar/pull/1699 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index caaa7bf3f1..9048544da6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -128,7 +128,7 @@ public void close() throws Exception { private void setupSerde() throws ClassNotFoundException { -Class typeArg = Class.forName(this.pulsarConfig.getTypeClassName()); +Class typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarConfig.getTypeClassName()); if (Void.class.equals(typeArg)) { throw new RuntimeException("Input type of Pulsar Function cannot be Void"); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1700: adding sink spec
jerrypeng commented on issue #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700#issuecomment-385596843 @srkukarni thanks for the review. I have addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1700: adding sink spec
jerrypeng commented on a change in pull request #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700#discussion_r185164567 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -64,6 +65,17 @@ message SourceSpec { maptopicsToSerDeClassName = 4; } +message SinkSpec { +string className = 1; +// map in json format +string configs = 2; + +// configs used only when source feeds into functions +SubscriptionType subscriptionType = 3; Review comment: you are right. I'll remove This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1700: adding sink spec
srkukarni commented on a change in pull request #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700#discussion_r185160964 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -43,15 +43,16 @@ message FunctionDetails { string namespace = 2; string name = 3; string className = 4; -string outputSerdeClassName = 5; -string output = 6; +//string outputSerdeClassName = 5; Review comment: pl remove instead of commenting This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1700: adding sink spec
srkukarni commented on a change in pull request #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700#discussion_r185160956 ## File path: pulsar-functions/proto/src/main/proto/Function.proto ## @@ -64,6 +65,17 @@ message SourceSpec { maptopicsToSerDeClassName = 4; } +message SinkSpec { +string className = 1; +// map in json format +string configs = 2; + +// configs used only when source feeds into functions +SubscriptionType subscriptionType = 3; Review comment: this does not make sense for sink? Also please update the comments above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1700: adding sink spec
jerrypeng commented on issue #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700#issuecomment-385584994 @sijie @srkukarni please review This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #1700: adding sink spec
jerrypeng opened a new pull request #1700: adding sink spec URL: https://github.com/apache/incubator-pulsar/pull/1700 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1593: PIP-17: the part of index block for offload.
zhaijack commented on a change in pull request #1593: PIP-17: the part of index block for offload. URL: https://github.com/apache/incubator-pulsar/pull/1593#discussion_r185156310 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java ## @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static com.google.common.base.Charsets.UTF_8; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.s3offload.impl.OffloadIndexEntryImpl; +import org.testng.annotations.Test; + +@Slf4j +public class OffloadIndexTest { + +@Test +public void offloadIndexEntryImplTest() { +// verify OffloadIndexEntryImpl builder +OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0); +OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234); + +// verify OffloadIndexEntryImpl get +assertTrue(entry1.getEntryId() == 0L); +assertTrue(entry1.getPartId() == 2); +assertTrue(entry1.getOffset() == 0L); + +assertTrue(entry2.getEntryId() == 100L); +assertTrue(entry2.getPartId() == 3); +assertTrue(entry2.getOffset() == 1234L); +} + + +// use mock to setLastEntryId +class LedgerMetadataMock extends org.apache.bookkeeper.client.LedgerMetadata { +long lastId = 0; +public LedgerMetadataMock(int ensembleSize, int writeQuorumSize, int ackQuorumSize, org.apache.bookkeeper.client.BookKeeper.DigestType digestType, byte[] password, MapcustomMetadata, boolean storeSystemtimeAsLedgerCreationTime) { +super(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, customMetadata, storeSystemtimeAsLedgerCreationTime); +} + +@Override +public long getLastEntryId(){ +return lastId; +} + +public void setLastEntryId(long lastId) { +this.lastId = lastId; +} +} + +private LedgerMetadata createLedgerMetadata() throws Exception { + +Map metadataCustom = Maps.newHashMap(); +metadataCustom.put("key1", "value1".getBytes(UTF_8)); +metadataCustom.put("key7", "value7".getBytes(UTF_8)); + +ArrayList bookies = Lists.newArrayList(); +BookieSocketAddress BOOKIE1 = new BookieSocketAddress("127.0.0.1:3181"); +BookieSocketAddress BOOKIE2 = new BookieSocketAddress("127.0.0.2:3181"); +BookieSocketAddress BOOKIE3 = new BookieSocketAddress("127.0.0.3:3181"); +bookies.add(0, BOOKIE1); +bookies.add(1, BOOKIE2); +bookies.add(2, BOOKIE3); + +LedgerMetadataMock metadata = new LedgerMetadataMock(3, 3, 2, +DigestType.CRC32C, "password".getBytes(UTF_8), metadataCustom, false); + +metadata.addEnsemble(0, bookies); +metadata.setLastEntryId(5000); +return metadata; +} + +// prepare metadata, then use builder to build a OffloadIndexBlockImpl +// verify get methods, readout and fromStream methods. +@Test +public void offloadIndexBlockImplTest() throws Exception { +OffloadIndexBlockBuilder blockBuilder = OffloadIndexBlockBuilder.create(); +LedgerMetadata metadata = createLedgerMetadata(); +log.debug("created metadata: {}", metadata.toString()); + +blockBuilder.withMetadata(metadata); + +blockBuilder.addBlock(0, 2, 0); +blockBuilder.addBlock(1000, 3, 64 * 1024 *
[incubator-pulsar] branch master updated: Check if javaInstance is created in the first place before invocing close (#1688)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 600b55d Check if javaInstance is created in the first place before invocing close (#1688) 600b55d is described below commit 600b55d2aff1aed6427348fe50cb1d4a3bc5a809 Author: Sanjeev KulkarniAuthorDate: Mon Apr 30 18:57:16 2018 -0700 Check if javaInstance is created in the first place before invocing close (#1688) --- .../org/apache/pulsar/functions/instance/JavaInstanceRunnable.java| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e181204..a4fe026 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -348,7 +348,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void close() { processor.close(); -javaInstance.close(); +if (null != javaInstance) { +javaInstance.close(); +} // kill the state table if (null != stateTable) { -- To stop receiving notification emails like this one, please contact si...@apache.org.
[GitHub] sijie closed pull request #1688: Check if javaInstance is created in the first place before invocing close
sijie closed pull request #1688: Check if javaInstance is created in the first place before invocing close URL: https://github.com/apache/incubator-pulsar/pull/1688 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d3853e2535..f57b604782 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -348,7 +348,9 @@ private void sendOutputMessage(Record srcRecord, @Override public void close() { processor.close(); -javaInstance.close(); +if (null != javaInstance) { +javaInstance.close(); +} // kill the state table if (null != stateTable) { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1696: Allow functions to be triggered without specifying topic name
sijie closed pull request #1696: Allow functions to be triggered without specifying topic name URL: https://github.com/apache/incubator-pulsar/pull/1696 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 80e98af1a5..c22b6117b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -237,12 +237,12 @@ public Response getAssignments() { public Response triggerFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, -final @PathParam("topic") String topic, final @FormDataParam("data") String triggerValue, -final @FormDataParam("dataStream") InputStream triggerStream) { +final @FormDataParam("dataStream") InputStream triggerStream, +final @FormDataParam("topic") String topic) { return functions.triggerFunction( -tenant, namespace, functionName, topic, triggerValue, triggerStream); +tenant, namespace, functionName, triggerValue, triggerStream, topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 8f058c7530..9a774956b2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -157,7 +157,7 @@ * @throws PulsarAdminException * Unexpected error */ -String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException; +String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException; /** * Upload Data. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 30e6bd9d52..9d5b8eec5f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -149,7 +149,7 @@ public void updateFunction(FunctionDetails functionDetails, String fileName) thr } @Override -public String triggerFunction(String tenant, String namespace, String functionName, String triggerValue, String triggerFile) throws PulsarAdminException { +public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); if (triggerFile != null) { @@ -160,9 +160,11 @@ public String triggerFunction(String tenant, String namespace, String functionNa if (triggerValue != null) { mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE)); } -String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger")) +if (topic != null && !topic.isEmpty()) { +mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE)); +} +return request(functions.path(tenant).path(namespace).path(functionName).path("trigger")) .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class); -return response; } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index e2f0a7172a..8cc1e2308f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -776,12 +776,14 @@ void runCmd() throws Exception {
[incubator-pulsar] branch master updated: Allow functions to be triggered without specifying topic name (#1696)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new fb7198a Allow functions to be triggered without specifying topic name (#1696) fb7198a is described below commit fb7198a669e0647cf3213739b7b6ffb8c650d978 Author: Sanjeev KulkarniAuthorDate: Mon Apr 30 18:56:25 2018 -0700 Allow functions to be triggered without specifying topic name (#1696) * Re-added trigger functionality with no need for topic name * Unified paths * Fix --- .../pulsar/broker/admin/impl/FunctionsBase.java | 6 +++--- .../org/apache/pulsar/client/admin/Functions.java | 2 +- .../pulsar/client/admin/internal/FunctionsImpl.java | 8 +--- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 4 +++- .../functions/worker/rest/api/FunctionsImpl.java| 21 - .../worker/rest/api/v2/FunctionApiV2Resource.java | 6 +++--- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 80e98af..c22b611 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -237,12 +237,12 @@ public class FunctionsBase extends AdminResource implements Supplier
[GitHub] srkukarni commented on issue #1699: Use Function class loader to resolve function types
srkukarni commented on issue #1699: Use Function class loader to resolve function types URL: https://github.com/apache/incubator-pulsar/pull/1699#issuecomment-385580219 @jerrypeng @sijie This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1699: Use Function class loader to resolve function types
srkukarni opened a new pull request #1699: Use Function class loader to resolve function types URL: https://github.com/apache/incubator-pulsar/pull/1699 ### Motivation Since user types for function could involve user jar, always use function class loader to resolve function types. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni closed pull request #1698: Use Function ClassLoader to load types
srkukarni closed pull request #1698: Use Function ClassLoader to load types URL: https://github.com/apache/incubator-pulsar/pull/1698 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e181204189..85cce9a242 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -159,7 +159,7 @@ JavaInstance setupJavaInstance() throws Exception { // start the output producer processor.setupOutput(outputSerDe); // start the input consumer -processor.setupInput(typeArgs[0]); +processor.setupInput(typeArgs[0], clsLoader); // start any log topic handler setupLogHandler(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java index 0dcf12c556..6baa113e8a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java @@ -63,7 +63,7 @@ static MessageProcessor create(PulsarClient client, * @param inputType the input type of the function * @throws Exception */ -void setupInput(Class inputType) +void setupInput(Class inputType, ClassLoader clsLoader) throws Exception; /** diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java index a2a5d8bc8a..ff57bd8744 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java @@ -61,7 +61,7 @@ protected MessageProcessorBase(PulsarClient client, // @Override -public void setupInput(Class inputType) throws Exception { +public void setupInput(Class inputType, ClassLoader clsLoader) throws Exception { org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource(); Object object; @@ -75,6 +75,7 @@ public void setupInput(Class inputType) throws Exception { pulsarConfig.setSubscriptionType( FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name())); pulsarConfig.setTypeClassName(inputType.getName()); +pulsarConfig.setClsLoader(clsLoader); Object[] params = {this.client, pulsarConfig}; Class[] paramTypes = {PulsarClient.class, PulsarConfig.class}; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java index 2a5dc44d32..d33fc057ff 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java @@ -40,6 +40,7 @@ private String subscriptionName; private MaptopicSerdeClassNameMap; private String typeClassName; +private ClassLoader clsLoader; public static PulsarConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index caaa7bf3f1..dae6dfbce1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -128,7 +128,7 @@ public void close() throws Exception { private void setupSerde() throws ClassNotFoundException { -Class typeArg = Class.forName(this.pulsarConfig.getTypeClassName()); +Class typeArg = pulsarConfig.getClsLoader().loadClass(pulsarConfig.getTypeClassName()); if
[GitHub] srkukarni commented on issue #1698: Use Function ClassLoader to load types
srkukarni commented on issue #1698: Use Function ClassLoader to load types URL: https://github.com/apache/incubator-pulsar/pull/1698#issuecomment-385579728 @jerrypeng @sijie This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1698: Use Function ClassLoader to load types
srkukarni opened a new pull request #1698: Use Function ClassLoader to load types URL: https://github.com/apache/incubator-pulsar/pull/1698 ### Motivation Since user types for function could involve user jar, always use function class loader to resolve function types. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1697: adding function worker initialized check
jerrypeng commented on issue #1697: adding function worker initialized check URL: https://github.com/apache/incubator-pulsar/pull/1697#issuecomment-385569773 @sijie @srkukarni please review This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #1697: adding function worker initialized check
jerrypeng opened a new pull request #1697: adding function worker initialized check URL: https://github.com/apache/incubator-pulsar/pull/1697 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1693: Make SchemaSerializationException an unchecked exception
sijie closed pull request #1693: Make SchemaSerializationException an unchecked exception URL: https://github.com/apache/incubator-pulsar/pull/1693 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 09fcc10bc1..726ca78367 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -240,15 +240,13 @@ private int buildMessage(TypedMessageBuilderbuilder, ProducerRecord
[incubator-pulsar] branch master updated: Make SchemaSerializationException an unchecked exception (#1693)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new cbdce09 Make SchemaSerializationException an unchecked exception (#1693) cbdce09 is described below commit cbdce09db4696b77201368e35e26bb8b5b24dd1e Author: Matteo MerliAuthorDate: Mon Apr 30 17:28:03 2018 -0700 Make SchemaSerializationException an unchecked exception (#1693) --- .../org/apache/kafka/clients/producer/PulsarKafkaProducer.java| 8 +++- .../main/java/org/apache/pulsar/client/api/MessageBuilder.java| 2 +- .../src/main/java/org/apache/pulsar/client/api/Schema.java| 2 +- .../apache/pulsar/client/api/SchemaSerializationException.java| 2 +- .../java/org/apache/pulsar/client/api/TypedMessageBuilder.java| 2 +- .../java/org/apache/pulsar/client/impl/MessageBuilderImpl.java| 7 +++ .../org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java| 4 +--- 7 files changed, 11 insertions(+), 16 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 09fcc10..726ca78 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -240,15 +240,13 @@ public class PulsarKafkaProducer implements Producer { if (record.key() != null) { builder.key(getKey(record.topic(), record.key())); } + if (record.timestamp() != null) { builder.eventTime(record.timestamp()); } + byte[] value = valueSerializer.serialize(record.topic(), record.value()); -try { -builder.value(value); -} catch (SchemaSerializationException e) { -throw new RuntimeException(e); -} +builder.value(value); return value.length; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java index cf08d07..6054dfe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java @@ -60,7 +60,7 @@ public interface MessageBuilder { * @param value *the domain object */ -MessageBuilder setValue(T value) throws SchemaSerializationException; +MessageBuilder setValue(T value); /** * Set the content of the message diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index 81d6233..12c35c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -36,7 +36,7 @@ public interface Schema { * @throws SchemaSerializationException * if the serialization fails */ -byte[] encode(T message) throws SchemaSerializationException; +byte[] encode(T message); /** * Decode a byte array into an object using the schema definition and deserializer implementation diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java index e31c4cf..39248d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.api; -public class SchemaSerializationException extends PulsarClientException { +public class SchemaSerializationException extends RuntimeException { public SchemaSerializationException(Throwable cause) { super(cause); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java index 7a915c6..72cbbc7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java @@ -95,7 +95,7 @@ public interface TypedMessageBuilder extends Serializable { * @param value *the domain object */ -TypedMessageBuilder value(T value) throws
[GitHub] merlimat commented on issue #1650: Documentation of admin REST API is missing body parameters
merlimat commented on issue #1650: Documentation of admin REST API is missing body parameters URL: https://github.com/apache/incubator-pulsar/issues/1650#issuecomment-385565942 Moved to 2.1 since it requires a bit of work and investigation with Swagger to include the JSON format for the requests. (In any case this is just on website, so it's not technically "tied" to the release tag) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1690: Fix Golang setup in Dockerfile
merlimat closed pull request #1690: Fix Golang setup in Dockerfile URL: https://github.com/apache/incubator-pulsar/pull/1690 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index e94837546b..7a1cfff59f 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -23,8 +23,8 @@ FROM ubuntu:16.04 RUN mkdir /pulsar ADD protobuf.patch /pulsar -RUN apt-get update -RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ +RUN apt-get update && \ +apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \ libjsoncpp-dev libxml2-utils protobuf-compiler wget \ curl doxygen openjdk-8-jdk-headless clang-format-5.0 \ @@ -54,7 +54,9 @@ RUN rvm install 2.4.1 RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py RUN pip install pdoc -# Install Protobuf doc generator +# Install Protobuf doc generator (requires Go) +ENV GOPATH "$HOME/go" +ENV PATH "/usr/lib/go-1.10/bin:$GOPATH/bin:$PATH" RUN go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc # Build the patched protoc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix Golang setup in Dockerfile (#1690)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f5d9502 Fix Golang setup in Dockerfile (#1690) f5d9502 is described below commit f5d95028dc3953c1ed89fb0a114b18528da76715 Author: Luc PerkinsAuthorDate: Mon Apr 30 17:04:10 2018 -0700 Fix Golang setup in Dockerfile (#1690) * When preparing pulsar-build Docker image, ensure apt-get update is not cached * add Go setup to Dockerfile for website build * update PATH * set proper env vars * re-add golang install to main apt-get statement --- build/docker/Dockerfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index 35d04a5..7a1cfff 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -54,7 +54,9 @@ RUN rvm install 2.4.1 RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py RUN pip install pdoc -# Install Protobuf doc generator +# Install Protobuf doc generator (requires Go) +ENV GOPATH "$HOME/go" +ENV PATH "/usr/lib/go-1.10/bin:$GOPATH/bin:$PATH" RUN go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc # Build the patched protoc -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] merlimat closed pull request #1695: Remove "global" from returned clusters list
merlimat closed pull request #1695: Remove "global" from returned clusters list URL: https://github.com/apache/incubator-pulsar/pull/1695 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 04389cc98a..be1b7dde12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,6 +40,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; @@ -334,7 +335,11 @@ protected ZooKeeperChildrenCache managedLedgerListCache() { protected Set clusters() { try { -return pulsar().getConfigurationCache().clustersListCache().get(); +Set clusters = pulsar().getConfigurationCache().clustersListCache().get(); + +// Remove "global" cluster from returned list +clusters.remove(Constants.GLOBAL_CLUSTER); +return clusters; } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 5c9346e22c..73dc79c1b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -869,4 +869,10 @@ public void brokerNamespaceIsolationPolicies() throws Exception { } catch (PulsarAdminException.NotFoundException e) {// expected } } + +@Test +public void clustersList() throws PulsarAdminException { +final String cluster = pulsar.getConfiguration().getClusterName(); +assertEquals(admin.clusters().getClusters(), Lists.newArrayList(cluster)); +} } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Remove "global" from returned clusters list (#1695)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1e0e12a Remove "global" from returned clusters list (#1695) 1e0e12a is described below commit 1e0e12a72fb978abc04797b049dada100d8d0832 Author: Matteo MerliAuthorDate: Mon Apr 30 17:03:54 2018 -0700 Remove "global" from returned clusters list (#1695) --- .../main/java/org/apache/pulsar/broker/admin/AdminResource.java| 7 ++- .../test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java| 6 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 04389cc..be1b7dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,6 +40,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; @@ -334,7 +335,11 @@ public abstract class AdminResource extends PulsarWebResource { protected Set clusters() { try { -return pulsar().getConfigurationCache().clustersListCache().get(); +Set clusters = pulsar().getConfigurationCache().clustersListCache().get(); + +// Remove "global" cluster from returned list +clusters.remove(Constants.GLOBAL_CLUSTER); +return clusters; } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 5c9346e..73dc79c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -869,4 +869,10 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { } catch (PulsarAdminException.NotFoundException e) {// expected } } + +@Test +public void clustersList() throws PulsarAdminException { +final String cluster = pulsar.getConfiguration().getClusterName(); +assertEquals(admin.clusters().getClusters(), Lists.newArrayList(cluster)); +} } -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] srkukarni commented on issue #1696: Allow functions to be triggered without specifying topic name
srkukarni commented on issue #1696: Allow functions to be triggered without specifying topic name URL: https://github.com/apache/incubator-pulsar/pull/1696#issuecomment-385563329 @jerrypeng @sijie This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1696: Trigger fix
srkukarni opened a new pull request #1696: Trigger fix URL: https://github.com/apache/incubator-pulsar/pull/1696 ### Motivation Added the trigger functionality where one does not need to specify input topic. This will work for those functions who only take one input Pulsar Topic. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1352: Delete inactive subscriptions automatically
merlimat commented on issue #1352: Delete inactive subscriptions automatically URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-385560450 Merged with current master and fixed conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1656: Add admin api to delete topic forcefully
merlimat commented on issue #1656: Add admin api to delete topic forcefully URL: https://github.com/apache/incubator-pulsar/pull/1656#issuecomment-385559863 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1657: Document default values for Pulsar Functions
sijie commented on issue #1657: Document default values for Pulsar Functions URL: https://github.com/apache/incubator-pulsar/pull/1657#issuecomment-385557002 @srkukarni : I think @lucperkins addressed your comments. can you review it again? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Key the download directory by the instance id (#1691)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3c99306 Key the download directory by the instance id (#1691) 3c99306 is described below commit 3c99306f2e3e95e9070ea47911fbd6855d860261 Author: Sanjeev KulkarniAuthorDate: Mon Apr 30 16:07:09 2018 -0700 Key the download directory by the instance id (#1691) --- .../java/org/apache/pulsar/functions/worker/FunctionActioner.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index d7bbc96..e6821f6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -108,7 +108,7 @@ public class FunctionActioner implements AutoCloseable { functionMetaData.getFunctionDetails().getName(), instance.getInstanceId()); File pkgDir = new File( workerConfig.getDownloadDirectory(), -getDownloadPackagePath(functionMetaData)); +getDownloadPackagePath(functionMetaData, instance.getInstanceId())); pkgDir.mkdirs(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); @@ -184,7 +184,7 @@ public class FunctionActioner implements AutoCloseable { // clean up function package File pkgDir = new File( workerConfig.getDownloadDirectory(), -getDownloadPackagePath(functionMetaData)); +getDownloadPackagePath(functionMetaData, instance.getInstanceId())); if (pkgDir.exists()) { try { @@ -196,12 +196,13 @@ public class FunctionActioner implements AutoCloseable { } } -private String getDownloadPackagePath(FunctionMetaData functionMetaData) { +private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) { return StringUtils.join( new String[]{ functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), +Integer.toString(instanceId), }, File.separatorChar); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] merlimat closed pull request #1691: Key the download directory by the instance id
merlimat closed pull request #1691: Key the download directory by the instance id URL: https://github.com/apache/incubator-pulsar/pull/1691 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index d7bbc96618..e6821f67f1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -108,7 +108,7 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep functionMetaData.getFunctionDetails().getName(), instance.getInstanceId()); File pkgDir = new File( workerConfig.getDownloadDirectory(), -getDownloadPackagePath(functionMetaData)); +getDownloadPackagePath(functionMetaData, instance.getInstanceId())); pkgDir.mkdirs(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); @@ -184,7 +184,7 @@ private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { // clean up function package File pkgDir = new File( workerConfig.getDownloadDirectory(), -getDownloadPackagePath(functionMetaData)); +getDownloadPackagePath(functionMetaData, instance.getInstanceId())); if (pkgDir.exists()) { try { @@ -196,12 +196,13 @@ private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { } } -private String getDownloadPackagePath(FunctionMetaData functionMetaData) { +private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) { return StringUtils.join( new String[]{ functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), +Integer.toString(instanceId), }, File.separatorChar); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Renamed Schema.IDENTITY into Schema.BYTES (#1694)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1eff40e Renamed Schema.IDENTITY into Schema.BYTES (#1694) 1eff40e is described below commit 1eff40ebf280cc9205f2370fde8a2fd8a74dcaf3 Author: Matteo MerliAuthorDate: Mon Apr 30 15:59:29 2018 -0700 Renamed Schema.IDENTITY into Schema.BYTES (#1694) --- .../apache/pulsar/client/impl/RawReaderImpl.java | 2 +- .../pulsar/client/admin/internal/TopicsImpl.java | 4 +-- .../apache/pulsar/client/api/MessageBuilder.java | 2 +- .../java/org/apache/pulsar/client/api/Schema.java | 23 -- .../org/apache/pulsar/client/impl/MessageImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 14 - .../schema/BytesSchema.java} | 36 ++ .../{api/schemas => impl/schema}/StringSchema.java | 2 +- .../pulsar/client/schemas/DefaultSchemasTest.java | 2 +- 9 files changed, 30 insertions(+), 57 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 4a91477..e768c3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -103,7 +103,7 @@ public class RawReaderImpl implements RawReader { RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, CompletableFuture > consumerFuture) { super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1, -consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY); +consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 8749d6a..0ccb16e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -851,7 +851,7 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics } } -return Collections.singletonList(new MessageImpl (msgId, properties, data, Schema.IDENTITY)); +return Collections.singletonList(new MessageImpl (msgId, properties, data, Schema.BYTES)); } finally { if (stream != null) { stream.close(); @@ -876,7 +876,7 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics properties.put(entry.getKey(), entry.getValue()); } } -ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.IDENTITY)); +ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.BYTES)); } catch (Exception ex) { log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java index 6839341..cf08d07 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java @@ -44,7 +44,7 @@ public interface MessageBuilder { } static MessageBuilder create() { -return create(Schema.IDENTITY); +return create(Schema.BYTES); } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index c5ff14d..81d6233 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,7 +18,8 @@ */ package org.apache.pulsar.client.api; -import org.apache.pulsar.client.api.schemas.StringSchema; +import org.apache.pulsar.client.impl.schema.BytesSchema; +import org.apache.pulsar.client.impl.schema.StringSchema; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -51,22 +52,10 @@ public interface Schema { */ SchemaInfo getSchemaInfo(); -
[GitHub] merlimat closed pull request #1694: Renamed Schema.IDENTITY into Schema.BYTES
merlimat closed pull request #1694: Renamed Schema.IDENTITY into Schema.BYTES URL: https://github.com/apache/incubator-pulsar/pull/1694 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 4a914778fd..e768c3e463 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -103,7 +103,7 @@ public String toString() { RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationDataconf, CompletableFuture > consumerFuture) { super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1, -consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY); +consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.BYTES); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 8749d6ae6f..0ccb16edec 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -851,7 +851,7 @@ private TopicName validateTopic(String topic) { } } -return Collections.singletonList(new MessageImpl (msgId, properties, data, Schema.IDENTITY)); +return Collections.singletonList(new MessageImpl (msgId, properties, data, Schema.BYTES)); } finally { if (stream != null) { stream.close(); @@ -876,7 +876,7 @@ private TopicName validateTopic(String topic) { properties.put(entry.getKey(), entry.getValue()); } } -ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.IDENTITY)); +ret.add(new MessageImpl<>(batchMsgId, properties, singleMessagePayload, Schema.BYTES)); } catch (Exception ex) { log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java index 6839341eae..cf08d074d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java @@ -44,7 +44,7 @@ } static MessageBuilder create() { -return create(Schema.IDENTITY); +return create(Schema.BYTES); } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index c5ff14d92d..81d6233a99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,7 +18,8 @@ */ package org.apache.pulsar.client.api; -import org.apache.pulsar.client.api.schemas.StringSchema; +import org.apache.pulsar.client.impl.schema.BytesSchema; +import org.apache.pulsar.client.impl.schema.StringSchema; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -51,22 +52,10 @@ */ SchemaInfo getSchemaInfo(); -Schema IDENTITY = new Schema () { -@Override -public byte[] encode(byte[] message) { -return message; -} - -@Override -public byte[] decode(byte[] bytes) { -return bytes; -} - -@Override -public SchemaInfo getSchemaInfo() { -return null; -} -}; +/** + * Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through. + */ +Schema BYTES = new BytesSchema(); /** * Schema that can be used to encode/decode messages whose values are String. The payload is encoded with UTF-8. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index e02f9f68bb..7001530b59
[GitHub] merlimat commented on issue #1690: Fix Golang setup in Dockerfile
merlimat commented on issue #1690: Fix Golang setup in Dockerfile URL: https://github.com/apache/incubator-pulsar/pull/1690#issuecomment-385552629 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1690: Fix Golang setup in Dockerfile
merlimat commented on issue #1690: Fix Golang setup in Dockerfile URL: https://github.com/apache/incubator-pulsar/pull/1690#issuecomment-385552629 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #1695: Remove "global" from returned clusters list
merlimat opened a new pull request #1695: Remove "global" from returned clusters list URL: https://github.com/apache/incubator-pulsar/pull/1695 ### Motivation `global` clusters is still returned in the clusters list command. Since this was an artificial placeholder, which is not even needed anymore, we should prune it from the clusters list, to avoid confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie closed pull request #1681: additional refactoring to use source interface
sijie closed pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 832cb6630e..7531742fec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -146,11 +146,12 @@ public Response getAssignments() { public Response triggerFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, +final @PathParam("topic") String topic, final @FormDataParam("data") String triggerValue, final @FormDataParam("dataStream") InputStream triggerStream) { return functions.triggerFunction( -tenant, namespace, functionName, triggerValue, triggerStream); +tenant, namespace, functionName, topic, triggerValue, triggerStream); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 92bd757af3..e2f0a7172a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -23,12 +23,15 @@ import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.ObjectOutputStream; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -49,7 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.instance.PulsarSource; +import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; @@ -59,8 +62,11 @@ import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil; import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled; -import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails; +import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType; +import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; @@ -323,7 +329,7 @@ void processArguments() throws Exception { } private void doJavaSubmitChecks(FunctionConfig functionConfig) { -if (isNull(className)) { +if (isNull(functionConfig.getClassName())) { throw new IllegalArgumentException("You supplied a jar file but no main class"); } @@ -527,6 +533,68 @@ private String getUniqueInput(FunctionConfig functionConfig) { return functionConfig.getCustomSerdeInputs().keySet().iterator().next(); } } + +protected FunctionDetails convert(FunctionConfig functionConfig) +throws IOException { +FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + +// Setup source +MaptopicToSerDeClassNameMap = new HashMap<>(); + topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs()); +SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); +if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { +sourceSpecBuilder.setClassName(PulsarSource.class.getName()); +} +
[incubator-pulsar] branch master updated: additional refactoring to use source interface (#1681)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e9a1b9a additional refactoring to use source interface (#1681) e9a1b9a is described below commit e9a1b9a58b28687558b29705ea66047ef82fd05c Author: Boyang Jerry PengAuthorDate: Mon Apr 30 15:21:50 2018 -0700 additional refactoring to use source interface (#1681) * additional refactoring to use source interface * removing PulsarConstants * remove unnecessary import * removing sink message for now * addressing comments * adding null check --- .../pulsar/broker/admin/impl/FunctionsBase.java| 3 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 148 +- .../org/apache/pulsar/functions/api/Context.java | 3 +- .../pulsar/functions/instance/ContextImpl.java | 2 +- .../pulsar/functions/instance/JavaInstance.java| 3 +- .../functions/instance/JavaInstanceRunnable.java | 29 +- .../instance/processors/AtLeastOnceProcessor.java | 5 +- .../instance/processors/AtMostOnceProcessor.java | 5 +- .../processors/EffectivelyOnceProcessor.java | 12 +- .../instance/processors/MessageProcessor.java | 28 +- .../instance/processors/MessageProcessorBase.java | 50 ++-- .../{instance => source}/PulsarConfig.java | 26 +- .../{instance => source}/PulsarRecord.java | 2 +- .../{instance => source}/PulsarSource.java | 63 - .../instance/src/main/python/Function_pb2.py | 305 ++--- .../instance/src/main/python/python_instance.py| 26 +- .../src/main/python/python_instance_main.py| 35 ++- .../instance/JavaInstanceRunnableTest.java | 45 --- .../functions/instance/JavaInstanceTest.java | 4 +- .../pulsar/functions/source/PulsarSourceTest.java | 136 + .../proto/src/main/proto/Function.proto| 52 ++-- .../functions/proto/FunctionDetailsTest.java | 2 +- .../pulsar/functions/runtime/JavaInstanceMain.java | 56 ++-- .../pulsar/functions/runtime/ProcessRuntime.java | 55 +--- .../functions/runtime/ProcessRuntimeTest.java | 31 ++- .../pulsar/functions/utils/FunctionConfig.java | 12 +- .../functions/worker/rest/api/FunctionsImpl.java | 25 +- .../worker/rest/api/v2/FunctionApiV2Resource.java | 3 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 278 --- 29 files changed, 731 insertions(+), 713 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index e5c050e..80e98af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -237,11 +237,12 @@ public class FunctionsBase extends AdminResource implements Supplier topicToSerDeClassNameMap = new HashMap<>(); + topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs()); +SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); +if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { +sourceSpecBuilder.setClassName(PulsarSource.class.getName()); +} +functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, "")); + sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap); +if (functionConfig.getSubscriptionType() != null) { +sourceSpecBuilder + .setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType())); +} +functionDetailsBuilder.setSource(sourceSpecBuilder); + +if (functionConfig.getTenant() != null) { +functionDetailsBuilder.setTenant(functionConfig.getTenant()); +} +if (functionConfig.getNamespace() != null) { + functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); +} +if (functionConfig.getName() != null) { +functionDetailsBuilder.setName(functionConfig.getName()); +} +if (functionConfig.getClassName() != null) { + functionDetailsBuilder.setClassName(functionConfig.getClassName()); +} +if (functionConfig.getOutput() != null) { +functionDetailsBuilder.setOutput(functionConfig.getOutput()); +} +if (functionConfig.getOutputSerdeClassName() != null) { + functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName()); +} +if
[GitHub] merlimat opened a new pull request #1694: Renamed Schema.IDENTITY into Schema.BYTES
merlimat opened a new pull request #1694: Renamed Schema.IDENTITY into Schema.BYTES URL: https://github.com/apache/incubator-pulsar/pull/1694 ### Motivation `Schema.BYTES` seems to resonate more with people, so renaming before releasing it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #1693: Make SchemaSerializationException an unchecked exception
merlimat opened a new pull request #1693: Make SchemaSerializationException an unchecked exception URL: https://github.com/apache/incubator-pulsar/pull/1693 ### Motivation Since the exception when serializing is only thrown when there is a severe misconfiguration in the Schema implementation, we can avoid having the application to forcefully handle that exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Pulsar Functions for Python docs (#1482)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 240f533 Pulsar Functions for Python docs (#1482) 240f533 is described below commit 240f5332d573356ce3e139a38c50845956b59dbf Author: Luc PerkinsAuthorDate: Mon Apr 30 14:56:24 2018 -0700 Pulsar Functions for Python docs (#1482) * Add initial sections to Python API docs * add reference to specific python version * add missing java examples and complete java user config section * fix incorrect example command * add new sections to python API doc * add context object section * add python SerDe section * add section on logging * approaching draft of python docs * add some python examples to overview page * fix error in package name * fix dangling sentence --- site/_config.yml | 1 + site/docs/latest/functions/api.md| 246 +-- site/docs/latest/functions/guarantees.md | 10 +- site/docs/latest/functions/overview.md | 33 - 4 files changed, 272 insertions(+), 18 deletions(-) diff --git a/site/_config.yml b/site/_config.yml index 8995e52..b154071 100644 --- a/site/_config.yml +++ b/site/_config.yml @@ -27,6 +27,7 @@ destination: ../generated-site/content preview_version_id: 20180426.125800-32 current_version: 1.22.0-incubating +python_latest: "1.22.0" archived_releases: - 1.21.0-incubating - 1.20.0-incubating diff --git a/site/docs/latest/functions/api.md b/site/docs/latest/functions/api.md index 6f3ccbc..ca45b94 100644 --- a/site/docs/latest/functions/api.md +++ b/site/docs/latest/functions/api.md @@ -139,7 +139,7 @@ class WordFilter(Function): Writing Pulsar Functions in Java involves implementing one of two interfaces: * The [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface -* The {% javadoc Function client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference that it provides a {% javadoc Context client org.apache.pulsar.functions.api.Context %} object that you can use in a [variety of ways](#context) +* The {% javadoc Function client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` interface, but with the important difference that it provides a {% javadoc Context client org.apache.pulsar.functions.api.Context %} object that you can use in a [variety of ways](#context) ### Getting started @@ -149,8 +149,8 @@ In order to write Pulsar Functions in Java, you'll need to install the proper [d How you get started writing Pulsar Functions in Java depends on which API you're using: -* If you're writing [Java native function](#java-native), you won't need any external dependencies. -* If you're writing a [Java SDK](#java-sdk) function, you'll need to import the `pulsar-functions-api` library. +* If you're writing a [Java native function](#java-native), you won't need any external dependencies. +* If you're writing a [Java SDK function](#java-sdk), you'll need to import the `pulsar-functions-api` library. Here's an example for a Maven `pom.xml` configuration file: @@ -178,7 +178,7 @@ Whether you're writing Java Pulsar Functions using the [native](#java-native) Ja ### Java native functions {#java-native} -If your function doesn't require access to its [context](#context), you can create a Pulsar Function by implementing the [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface, which has this very simple, single-method signature: +If your function doesn't require access to its [context](#java-context), you can create a Pulsar Function by implementing the [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface, which has this very simple, single-method signature: ```java public interface Function { @@ -205,7 +205,7 @@ In general, you should use native functions when you don't need access to the fu There is one example Java native function in [this folder](https://github.com/apache/incubator-pulsar/tree/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples): -* [`ExclamationFunction`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java) +*
[GitHub] sijie closed pull request #1482: Pulsar Functions for Python docs
sijie closed pull request #1482: Pulsar Functions for Python docs URL: https://github.com/apache/incubator-pulsar/pull/1482 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/_config.yml b/site/_config.yml index 8995e52dff..b154071ab5 100644 --- a/site/_config.yml +++ b/site/_config.yml @@ -27,6 +27,7 @@ destination: ../generated-site/content preview_version_id: 20180426.125800-32 current_version: 1.22.0-incubating +python_latest: "1.22.0" archived_releases: - 1.21.0-incubating - 1.20.0-incubating diff --git a/site/docs/latest/functions/api.md b/site/docs/latest/functions/api.md index 6f3ccbc715..ca45b94aef 100644 --- a/site/docs/latest/functions/api.md +++ b/site/docs/latest/functions/api.md @@ -139,7 +139,7 @@ class WordFilter(Function): Writing Pulsar Functions in Java involves implementing one of two interfaces: * The [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface -* The {% javadoc Function client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference that it provides a {% javadoc Context client org.apache.pulsar.functions.api.Context %} object that you can use in a [variety of ways](#context) +* The {% javadoc Function client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` interface, but with the important difference that it provides a {% javadoc Context client org.apache.pulsar.functions.api.Context %} object that you can use in a [variety of ways](#context) ### Getting started @@ -149,8 +149,8 @@ In order to write Pulsar Functions in Java, you'll need to install the proper [d How you get started writing Pulsar Functions in Java depends on which API you're using: -* If you're writing [Java native function](#java-native), you won't need any external dependencies. -* If you're writing a [Java SDK](#java-sdk) function, you'll need to import the `pulsar-functions-api` library. +* If you're writing a [Java native function](#java-native), you won't need any external dependencies. +* If you're writing a [Java SDK function](#java-sdk), you'll need to import the `pulsar-functions-api` library. Here's an example for a Maven `pom.xml` configuration file: @@ -178,7 +178,7 @@ Whether you're writing Java Pulsar Functions using the [native](#java-native) Ja ### Java native functions {#java-native} -If your function doesn't require access to its [context](#context), you can create a Pulsar Function by implementing the [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface, which has this very simple, single-method signature: +If your function doesn't require access to its [context](#java-context), you can create a Pulsar Function by implementing the [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface, which has this very simple, single-method signature: ```java public interface Function { @@ -205,7 +205,7 @@ In general, you should use native functions when you don't need access to the fu There is one example Java native function in [this folder](https://github.com/apache/incubator-pulsar/tree/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples): -* [`ExclamationFunction`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java) +* [`JavaNativeExclmationFunction`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java) ### Java SDK functions {#java-sdk} @@ -219,11 +219,18 @@ There are several example Java SDK functions in [this folder](https://github.com Function name | Description :-|:--- -[`ContextFunction`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java) | Illustrate [context](#context)-specific functionality like [logging](#java-logging) and [metrics](#java-metrics) +[`ContextFunction`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java) | Illustrates [context](#context)-specific functionality like [logging](#java-logging) and [metrics](#java-metrics)
[GitHub] srkukarni commented on issue #1688: Check if javaInstance is created in the first place before invocing close
srkukarni commented on issue #1688: Check if javaInstance is created in the first place before invocing close URL: https://github.com/apache/incubator-pulsar/pull/1688#issuecomment-385540555 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave opened a new pull request #1692: Return an error if schema is incompatible
mgodave opened a new pull request #1692: Return an error if schema is incompatible URL: https://github.com/apache/incubator-pulsar/pull/1692 We added the notion of schema "compatibility" but did not handle an incompatible schema in the REST interface. This is to handle that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1689: When preparing pulsar-build Docker image, ensure apt-get update is not cached
merlimat closed pull request #1689: When preparing pulsar-build Docker image, ensure apt-get update is not cached URL: https://github.com/apache/incubator-pulsar/pull/1689 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index e94837546b..35d04a5b51 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -23,8 +23,8 @@ FROM ubuntu:16.04 RUN mkdir /pulsar ADD protobuf.patch /pulsar -RUN apt-get update -RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ +RUN apt-get update && \ +apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \ libjsoncpp-dev libxml2-utils protobuf-compiler wget \ curl doxygen openjdk-8-jdk-headless clang-format-5.0 \ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1691: Key the download directory by the instance id
srkukarni opened a new pull request #1691: Key the download directory by the instance id URL: https://github.com/apache/incubator-pulsar/pull/1691 ### Motivation If a worker is assigned more than one instance of a particular function, the download paths of the function package currently clash with each other. This pr makes the download path keyed by instance id as well to eliminate that clash. ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1691: Key the download directory by the instance id
srkukarni commented on issue #1691: Key the download directory by the instance id URL: https://github.com/apache/incubator-pulsar/pull/1691#issuecomment-385539924 @sijie @jerrypeng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins opened a new pull request #1690: Fix Golang setup in Dockerfile
lucperkins opened a new pull request #1690: Fix Golang setup in Dockerfile URL: https://github.com/apache/incubator-pulsar/pull/1690 I've verified this new setup: ```bash $ docker build -t pulsar-build build/docker $ docker run -it pulsar-build go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc $ docker run -it pulsar-build protoc-gen-doc -h Usage of protoc-gen-doc: # Etc. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Introduced TypedMessageBuilder (#1683)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0f2f478 Introduced TypedMessageBuilder (#1683) 0f2f478 is described below commit 0f2f478c1ffc4028d7fd271a1bc523b3342a07af Author: Matteo MerliAuthorDate: Mon Apr 30 14:03:06 2018 -0700 Introduced TypedMessageBuilder (#1683) * Introduced TypedMessageBuilder * Converted parts of the tests to use TypedMessageBuilder * Fixed tests --- .../pulsar/broker/service/BatchMessageTest.java| 45 ++ .../pulsar/broker/service/PartitionKeyTest.java| 9 +- .../broker/service/PersistentTopicE2ETest.java | 5 +- .../pulsar/client/api/ClientDeduplicationTest.java | 14 +- .../api/PartitionedProducerConsumerTest.java | 9 +- .../client/api/SimpleProducerConsumerTest.java | 16 +-- .../apache/pulsar/client/impl/RawReaderTest.java | 36 ++--- .../apache/pulsar/compaction/CompactionTest.java | 100 + .../clients/producer/PulsarKafkaProducer.java | 37 ++--- .../kafka/compat/tests/KafkaConsumerTest.java | 26 +--- .../org/apache/pulsar/client/cli/CmdProduce.java | 47 ++- .../apache/pulsar/client/api/MessageBuilder.java | 11 +- .../org/apache/pulsar/client/api/Producer.java | 42 +- .../pulsar/client/api/TypedMessageBuilder.java | 155 + .../client/impl/PartitionedProducerImpl.java | 19 ++- .../apache/pulsar/client/impl/ProducerBase.java| 51 +-- .../apache/pulsar/client/impl/ProducerImpl.java| 28 +--- .../client/impl/TypedMessageBuilderImpl.java | 136 ++ .../apache/pulsar/websocket/ProducerHandler.java | 22 +-- 19 files changed, 524 insertions(+), 284 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 87187c6..5be8b15 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -92,8 +92,7 @@ public class BatchMessageTest extends BrokerTestBase { List sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs; i++) { byte[] message = ("my-message-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -137,8 +136,7 @@ public class BatchMessageTest extends BrokerTestBase { // put a random sleep from 0 to 3 ms Thread.sleep(random.nextInt(4)); byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -173,8 +171,7 @@ public class BatchMessageTest extends BrokerTestBase { // put a random sleep from 0 to 3 ms Thread.sleep(random.nextInt(4)); byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -209,16 +206,13 @@ public class BatchMessageTest extends BrokerTestBase { if (i == 25) { // send a large message byte[] largeMessage = new byte[128 * 1024 + 4]; -Message msg = MessageBuilder.create().setContent(largeMessage).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(largeMessage)); } else { byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } } -byte[] message = ("msg-" + "last").getBytes(); -Message lastMsg = MessageBuilder.create().setContent(message).build(); +byte[] lastMsg = ("msg-" + "last").getBytes(); sendFutureList.add(producer.sendAsync(lastMsg)); FutureUtil.waitForAll(sendFutureList).get(); @@ -262,8 +256,7 @@ public class
[GitHub] sijie closed pull request #1683: Introduced TypedMessageBuilder
sijie closed pull request #1683: Introduced TypedMessageBuilder URL: https://github.com/apache/incubator-pulsar/pull/1683 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 87187c6486..5be8b150ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -92,8 +92,7 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio ListsendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs; i++) { byte[] message = ("my-message-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -137,8 +136,7 @@ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressio // put a random sleep from 0 to 3 ms Thread.sleep(random.nextInt(4)); byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -173,8 +171,7 @@ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType com // put a random sleep from 0 to 3 ms Thread.sleep(random.nextInt(4)); byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -209,16 +206,13 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType) t if (i == 25) { // send a large message byte[] largeMessage = new byte[128 * 1024 + 4]; -Message msg = MessageBuilder.create().setContent(largeMessage).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(largeMessage)); } else { byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } } -byte[] message = ("msg-" + "last").getBytes(); -Message lastMsg = MessageBuilder.create().setContent(message).build(); +byte[] lastMsg = ("msg-" + "last").getBytes(); sendFutureList.add(producer.sendAsync(lastMsg)); FutureUtil.waitForAll(sendFutureList).get(); @@ -262,8 +256,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType) thr List sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs; i++) { byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +sendFutureList.add(producer.sendAsync(message)); } FutureUtil.waitForAll(sendFutureList).get(); @@ -310,8 +303,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception { for (int i = 0; i < numMsgs; i++) { byte[] message = ("my-message-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -producer.send(msg); +producer.send(message); } PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -354,8 +346,7 @@ public void testSimpleBatchProducerConsumer1kMessages() throws Exception { List sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs; i++) { byte[] message = ("msg-" + i).getBytes(); -Message msg = MessageBuilder.create().setContent(message).build(); -sendFutureList.add(producer.sendAsync(msg)); +
[incubator-pulsar] branch master updated: Fixed NPE and added test for Schema.STRING (#1685)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a6e0e2 Fixed NPE and added test for Schema.STRING (#1685) 3a6e0e2 is described below commit 3a6e0e205de7272585e9978e619dc80638da5cfc Author: Matteo MerliAuthorDate: Mon Apr 30 14:02:16 2018 -0700 Fixed NPE and added test for Schema.STRING (#1685) * Fixed NPE and added test for Schema.STRING * Added license exclusion for generated file SchemaRegistryFormat.java --- pom.xml| 2 + .../apache/pulsar/broker/service/ServerCnx.java| 18 +- .../service/schema/SchemaRegistryServiceImpl.java | 28 ++-- .../service/schema/proto/SchemaRegistryFormat.java | 186 + .../src/main/proto/SchemaRegistryFormat.proto | 6 +- .../broker/service/schema/SchemaServiceTest.java | 8 +- .../apache/pulsar/client/api/SimpleSchemaTest.java | 63 +++ .../java/org/apache/pulsar/client/api/Schema.java | 30 .../pulsar/client/api/schemas/StringSchema.java| 3 +- .../pulsar/client/impl/ConnectionHandler.java | 1 + .../org/apache/pulsar/common/api/Commands.java | 12 +- .../apache/pulsar/common/api/proto/PulsarApi.java | 29 ++-- .../apache/pulsar/common/schema/SchemaInfo.java| 4 +- .../apache/pulsar/common/schema/SchemaType.java| 18 +- pulsar-common/src/main/proto/PulsarApi.proto | 9 +- 15 files changed, 247 insertions(+), 170 deletions(-) diff --git a/pom.xml b/pom.xml index af48238..761c710 100644 --- a/pom.xml +++ b/pom.xml @@ -842,6 +842,7 @@ flexible messaging model and an intuitive client API. **/*.key **/*.csr src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java + src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java bin/proto/* **/*.patch @@ -938,6 +939,7 @@ flexible messaging model and an intuitive client API. and are included in source tree for convenience --> src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java + src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java bin/proto/MLDataFormats_pb2.py diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d5a420f..4fac97d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -722,16 +722,14 @@ public class ServerCnx extends PulsarHandler { private static SchemaType getType(PulsarApi.Schema.Type protocolType) { switch (protocolType) { -case Json: -return SchemaType.JSON; -case Avro: -return SchemaType.AVRO; -case Thrift: -return SchemaType.THRIFT; -case Protobuf: -return SchemaType.PROTOBUF; -default: -return SchemaType.NONE; +case None: +return SchemaType.NONE; +case String: +return SchemaType.STRING; +case Json: +return SchemaType.JSON; +default: +return SchemaType.NONE; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 3303332..30e9b47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -148,29 +148,25 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { interface Functions { static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) { switch (type) { -case AVRO: -return SchemaType.AVRO; -case JSON: -return SchemaType.JSON; -case PROTO: -return SchemaType.PROTOBUF; -case THRIFT: -return SchemaType.THRIFT; -default: -return SchemaType.NONE; +case NONE: +return SchemaType.NONE; +case STRING: +return
[GitHub] sijie closed pull request #1685: Fixed NPE and added test for Schema.STRING
sijie closed pull request #1685: Fixed NPE and added test for Schema.STRING URL: https://github.com/apache/incubator-pulsar/pull/1685 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index af48238de7..761c710fd2 100644 --- a/pom.xml +++ b/pom.xml @@ -842,6 +842,7 @@ flexible messaging model and an intuitive client API. **/*.key **/*.csr src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java + src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java bin/proto/* **/*.patch @@ -938,6 +939,7 @@ flexible messaging model and an intuitive client API. and are included in source tree for convenience --> src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java + src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java bin/proto/MLDataFormats_pb2.py diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d5a420f635..4fac97d261 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -722,16 +722,14 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { private static SchemaType getType(PulsarApi.Schema.Type protocolType) { switch (protocolType) { -case Json: -return SchemaType.JSON; -case Avro: -return SchemaType.AVRO; -case Thrift: -return SchemaType.THRIFT; -case Protobuf: -return SchemaType.PROTOBUF; -default: -return SchemaType.NONE; +case None: +return SchemaType.NONE; +case String: +return SchemaType.STRING; +case Json: +return SchemaType.JSON; +default: +return SchemaType.NONE; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 3303332b16..30e9b4763c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -148,29 +148,25 @@ public void close() throws Exception { interface Functions { static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) { switch (type) { -case AVRO: -return SchemaType.AVRO; -case JSON: -return SchemaType.JSON; -case PROTO: -return SchemaType.PROTOBUF; -case THRIFT: -return SchemaType.THRIFT; -default: -return SchemaType.NONE; +case NONE: +return SchemaType.NONE; +case STRING: +return SchemaType.STRING; +case JSON: +return SchemaType.JSON; +default: +return SchemaType.NONE; } } static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaType type) { switch (type) { -case AVRO: -return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO; +case NONE: +return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; +case STRING: +return SchemaRegistryFormat.SchemaInfo.SchemaType.STRING; case JSON: return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON; -case THRIFT: -return SchemaRegistryFormat.SchemaInfo.SchemaType.THRIFT; -case PROTOBUF: -return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTO; default: return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
[GitHub] sijie commented on issue #1681: additional refactoring to use source interface
sijie commented on issue #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#issuecomment-385527527 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Add tenant and namespace getters to Python context (#1677)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d5780ef Add tenant and namespace getters to Python context (#1677) d5780ef is described below commit d5780ef1deeed6d24748f263a958e3212a603f89 Author: Luc PerkinsAuthorDate: Mon Apr 30 14:00:24 2018 -0700 Add tenant and namespace getters to Python context (#1677) * add tenant and namespace getters on Python context * switch to getInputTopicName * re-name current message methods --- pulsar-client-cpp/python/functions/context.py | 17 + .../java/org/apache/pulsar/functions/api/Context.java | 6 +++--- .../apache/pulsar/functions/instance/ContextImpl.java | 2 +- .../instance/src/main/python/contextimpl.py | 12 +--- 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pulsar-client-cpp/python/functions/context.py b/pulsar-client-cpp/python/functions/context.py index c1f30ed..0257b69 100644 --- a/pulsar-client-cpp/python/functions/context.py +++ b/pulsar-client-cpp/python/functions/context.py @@ -50,9 +50,18 @@ class Context(object): pass @abstractmethod - def get_topic_name(self): + def get_current_message_topic_name(self): """Returns the topic name of the message that we are processing""" pass + + @abstractmethod + def get_function_tenant(self): +"""Returns the tenant of the message that's being processed""" +pass + + @abstractmethod + def get_function_namespace(self): +"""Returns the namespace of the message that's being processed""" @abstractmethod def get_function_name(self): @@ -106,15 +115,15 @@ class Context(object): @abstractmethod def get_output_topic(self): -'''Returns the output topic of function''' +"""Returns the output topic of function""" pass @abstractmethod def get_output_serde_class_name(self): -'''return output Serde class''' +"""return output Serde class""" pass @abstractmethod def ack(self, msgid, topic): -'''ack this message id''' +"""ack this message id""" pass \ No newline at end of file diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index c4d78c2..653d176 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -40,10 +40,10 @@ public interface Context { byte[] getMessageId(); /** - * The topic that this message belongs to - * @return The topic name + * The input topic that the message currently being processed belongs to + * @return The input topic name */ -String getTopicName(); +String getCurrentMessageTopicName(); /** * Get a list of all input topics diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 04291b2..7a01d25 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -122,7 +122,7 @@ class ContextImpl implements Context { } @Override -public String getTopicName() { +public String getCurrentMessageTopicName() { return currentTopicName; } diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 6e59306..a841152 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -57,24 +57,30 @@ class ContextImpl(pulsar.Context): self.publish_producers = {} self.publish_serializers = {} self.current_message_id = None -self.current_topic_name = None +self.current_input_topic_name = None self.current_start_time = None # Called on a per message basis to set the context for the current message def set_current_message_context(self, msgid, topic): self.current_message_id = msgid -self.current_topic_name = topic +self.current_input_topic_name = topic self.current_start_time = time.time() def get_message_id(self): return self.current_message_id - def get_topic_name(self): + def get_current_message_topic_name(self): return self.current_topic_name def get_function_name(self): return self.instance_config.function_details.name + def get_function_tenant(self): +return
[GitHub] merlimat closed pull request #1677: Add tenant and namespace getters to Python context
merlimat closed pull request #1677: Add tenant and namespace getters to Python context URL: https://github.com/apache/incubator-pulsar/pull/1677 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/python/functions/context.py b/pulsar-client-cpp/python/functions/context.py index c1f30edc94..0257b69b20 100644 --- a/pulsar-client-cpp/python/functions/context.py +++ b/pulsar-client-cpp/python/functions/context.py @@ -50,9 +50,18 @@ def get_message_id(self): pass @abstractmethod - def get_topic_name(self): + def get_current_message_topic_name(self): """Returns the topic name of the message that we are processing""" pass + + @abstractmethod + def get_function_tenant(self): +"""Returns the tenant of the message that's being processed""" +pass + + @abstractmethod + def get_function_namespace(self): +"""Returns the namespace of the message that's being processed""" @abstractmethod def get_function_name(self): @@ -106,15 +115,15 @@ def publish(self, topic_name, message): @abstractmethod def get_output_topic(self): -'''Returns the output topic of function''' +"""Returns the output topic of function""" pass @abstractmethod def get_output_serde_class_name(self): -'''return output Serde class''' +"""return output Serde class""" pass @abstractmethod def ack(self, msgid, topic): -'''ack this message id''' +"""ack this message id""" pass \ No newline at end of file diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index c4d78c2ade..653d176303 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -40,10 +40,10 @@ byte[] getMessageId(); /** - * The topic that this message belongs to - * @return The topic name + * The input topic that the message currently being processed belongs to + * @return The input topic name */ -String getTopicName(); +String getCurrentMessageTopicName(); /** * Get a list of all input topics diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 04291b24e6..7a01d25b00 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -122,7 +122,7 @@ public void setCurrentMessageContext(MessageId messageId, String topicName) { } @Override -public String getTopicName() { +public String getCurrentMessageTopicName() { return currentTopicName; } diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 6e59306038..a841152366 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -57,24 +57,30 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers) self.publish_producers = {} self.publish_serializers = {} self.current_message_id = None -self.current_topic_name = None +self.current_input_topic_name = None self.current_start_time = None # Called on a per message basis to set the context for the current message def set_current_message_context(self, msgid, topic): self.current_message_id = msgid -self.current_topic_name = topic +self.current_input_topic_name = topic self.current_start_time = time.time() def get_message_id(self): return self.current_message_id - def get_topic_name(self): + def get_current_message_topic_name(self): return self.current_topic_name def get_function_name(self): return self.instance_config.function_details.name + def get_function_tenant(self): +return self.instance_config.function_details.tenant + + def get_function_namespace(self): +return self.instance_config.function_details.namespace + def get_function_id(self): return self.instance_config.function_id This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With
[GitHub] merlimat opened a new pull request #1689: When preparing pulsar-build Docker image, ensure apt-get update is not cached
merlimat opened a new pull request #1689: When preparing pulsar-build Docker image, ensure apt-get update is not cached URL: https://github.com/apache/incubator-pulsar/pull/1689 ### Motivation If the `apt-get update` is in a different `RUN ` command, Docker will cache it indipendently from the next step. That can lead to `apt-get install` to not find some specific versions of the packages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Protobuf documentation update (#1686)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 31131c2 Protobuf documentation update (#1686) 31131c2 is described below commit 31131c23644a3e9c9e951a6fb64741c290c84bb1 Author: Luc PerkinsAuthorDate: Mon Apr 30 13:47:28 2018 -0700 Protobuf documentation update (#1686) * use go get for protoc-gen-doc * modify Dockerfile for website build --- build/docker/Dockerfile | 9 +++-- site/.gitignore | 1 - site/Makefile| 6 +- site/scripts/protobuf-doc-gen.sh | 2 +- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index 2075135..e948375 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -28,7 +28,7 @@ RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \ libjsoncpp-dev libxml2-utils protobuf-compiler wget \ curl doxygen openjdk-8-jdk-headless clang-format-5.0 \ -gnupg2 +gnupg2 golang-1.10-go # Compile and install gtest RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib @@ -54,11 +54,8 @@ RUN rvm install 2.4.1 RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py RUN pip install pdoc -# Protogen doc generator -RUN wget https://github.com/pseudomuto/protoc-gen-doc/releases/download/v1.0.0-alpha/protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz && \ -tar xvfz protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz && \ -cp protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1/protoc-gen-doc /usr/local/bin && \ -rm protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz +# Install Protobuf doc generator +RUN go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc # Build the patched protoc RUN git clone https://github.com/google/protobuf.git /pulsar/protobuf && \ diff --git a/site/.gitignore b/site/.gitignore index 2a2780d..3ebc224 100644 --- a/site/.gitignore +++ b/site/.gitignore @@ -12,6 +12,5 @@ generated/ _data/config/*.bak _data/admin-rest-api-swagger.json _data/protobuf.json -scripts/protoc-gen-doc scripts/doxygen scripts/htmltest-* diff --git a/site/Makefile b/site/Makefile index 970959d..1fe6253 100644 --- a/site/Makefile +++ b/site/Makefile @@ -87,11 +87,7 @@ protobuf_doc_gen: scripts/protobuf-doc-gen.sh protobuf_setup: - rm -rf protoc-gen-doc scripts/protoc-gen-doc - git clone https://github.com/pseudomuto/protoc-gen-doc - rm -rf protoc-gen-doc/.git - mv protoc-gen-doc scripts - (cd scripts/protoc-gen-doc && PROTOBUF_PREFIX=$(shell brew --prefix protobuf) qmake && make) + go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc api_docs: javadoc python_doc_gen cpp_doc_gen diff --git a/site/scripts/protobuf-doc-gen.sh b/site/scripts/protobuf-doc-gen.sh index 0dfa2c0..01670ba 100755 --- a/site/scripts/protobuf-doc-gen.sh +++ b/site/scripts/protobuf-doc-gen.sh @@ -23,6 +23,6 @@ ROOT_DIR=$(git rev-parse --show-toplevel) PROTO_FILE=pulsar-common/src/main/proto/PulsarApi.proto ( - cd $(git rev-parse --show-toplevel) + cd $ROOT_DIR protoc --doc_out=json,protobuf.json:site/_data/ $PROTO_FILE ) -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] merlimat closed pull request #1686: Protobuf documentation update
merlimat closed pull request #1686: Protobuf documentation update URL: https://github.com/apache/incubator-pulsar/pull/1686 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index 2075135243..e94837546b 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -28,7 +28,7 @@ RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \ liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \ libjsoncpp-dev libxml2-utils protobuf-compiler wget \ curl doxygen openjdk-8-jdk-headless clang-format-5.0 \ -gnupg2 +gnupg2 golang-1.10-go # Compile and install gtest RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib @@ -54,11 +54,8 @@ RUN rvm install 2.4.1 RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py RUN pip install pdoc -# Protogen doc generator -RUN wget https://github.com/pseudomuto/protoc-gen-doc/releases/download/v1.0.0-alpha/protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz && \ -tar xvfz protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz && \ -cp protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1/protoc-gen-doc /usr/local/bin && \ -rm protoc-gen-doc-1.0.0-alpha.linux-amd64.go1.8.1.tar.gz +# Install Protobuf doc generator +RUN go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc # Build the patched protoc RUN git clone https://github.com/google/protobuf.git /pulsar/protobuf && \ diff --git a/site/.gitignore b/site/.gitignore index 2a2780d331..3ebc2241d3 100644 --- a/site/.gitignore +++ b/site/.gitignore @@ -12,6 +12,5 @@ generated/ _data/config/*.bak _data/admin-rest-api-swagger.json _data/protobuf.json -scripts/protoc-gen-doc scripts/doxygen scripts/htmltest-* diff --git a/site/Makefile b/site/Makefile index 970959d814..1fe6253c34 100644 --- a/site/Makefile +++ b/site/Makefile @@ -87,11 +87,7 @@ protobuf_doc_gen: scripts/protobuf-doc-gen.sh protobuf_setup: - rm -rf protoc-gen-doc scripts/protoc-gen-doc - git clone https://github.com/pseudomuto/protoc-gen-doc - rm -rf protoc-gen-doc/.git - mv protoc-gen-doc scripts - (cd scripts/protoc-gen-doc && PROTOBUF_PREFIX=$(shell brew --prefix protobuf) qmake && make) + go get -u github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc api_docs: javadoc python_doc_gen cpp_doc_gen diff --git a/site/scripts/protobuf-doc-gen.sh b/site/scripts/protobuf-doc-gen.sh index 0dfa2c0a16..01670ba5dd 100755 --- a/site/scripts/protobuf-doc-gen.sh +++ b/site/scripts/protobuf-doc-gen.sh @@ -23,6 +23,6 @@ ROOT_DIR=$(git rev-parse --show-toplevel) PROTO_FILE=pulsar-common/src/main/proto/PulsarApi.proto ( - cd $(git rev-parse --show-toplevel) + cd $ROOT_DIR protoc --doc_out=json,protobuf.json:site/_data/ $PROTO_FILE ) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni opened a new pull request #1688: Check if javaInstance is created in the first place before invocing close
srkukarni opened a new pull request #1688: Check if javaInstance is created in the first place before invocing close URL: https://github.com/apache/incubator-pulsar/pull/1688 ### Motivation If there is an issue setting up javaInstance(like if class path is not found, etc), there is an exception thrown in setupJavaInstance, which triggers close, which causes npe. This pr fixes that ### Modifications Describe the modifications you've done. ### Result After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on issue #1688: Check if javaInstance is created in the first place before invocing close
srkukarni commented on issue #1688: Check if javaInstance is created in the first place before invocing close URL: https://github.com/apache/incubator-pulsar/pull/1688#issuecomment-385521411 @sijie @jerrypeng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #1687: Don't offload empty ledgers
ivankelly opened a new pull request #1687: Don't offload empty ledgers URL: https://github.com/apache/incubator-pulsar/pull/1687 It shouldn't be possible for a ledger in a managed ledger to be empty (it should be cleaned up on recovery), but this patch adds defensive code so that if they do exist for some reason, they won't be offloaded. Master Issue: #1511 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1670: Check for tenant and namespace for pulsar-admin functions list command
lucperkins commented on issue #1670: Check for tenant and namespace for pulsar-admin functions list command URL: https://github.com/apache/incubator-pulsar/pull/1670#issuecomment-385517524 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1681: additional refactoring to use source interface
jerrypeng commented on issue #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#issuecomment-385514330 @srkukarni thanks for the review. I have addressed your comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface
jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185092763 ## File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java ## @@ -104,15 +99,18 @@ @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; -@Parameter(names = "--subscription_type", description = "What subscription type to use") -protected FunctionDetails.SubscriptionType subscriptionType; - -@Parameter(names = "--source_classname", description = "The source classname") +@Parameter(names = "--source_classname", description = "The source classname", required = true) protected String sourceClassname; -@Parameter(names = "--source_configs", description = "The source classname") +@Parameter(names = "--source_configs", description = "The source configs") protected String sourceConfigs; +@Parameter(names = "--source_subscription_type", description = "The source configs", required = true) +protected String sourceSubscriptionType; + +@Parameter(names = "--source_topics_serde_classname", description = "The source configs", required = true) Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface
jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185092730 ## File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java ## @@ -104,15 +99,18 @@ @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; -@Parameter(names = "--subscription_type", description = "What subscription type to use") -protected FunctionDetails.SubscriptionType subscriptionType; - -@Parameter(names = "--source_classname", description = "The source classname") +@Parameter(names = "--source_classname", description = "The source classname", required = true) protected String sourceClassname; -@Parameter(names = "--source_configs", description = "The source classname") +@Parameter(names = "--source_configs", description = "The source configs") protected String sourceConfigs; +@Parameter(names = "--source_subscription_type", description = "The source configs", required = true) Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface
jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185092339 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -271,9 +277,13 @@ void processArguments() throws Exception { } if (null != processingGuarantees) { functionConfig.setProcessingGuarantees(processingGuarantees); +} else if (functionConfig.getProcessingGuarantees() == null) { + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } if (null != subscriptionType) { functionConfig.setSubscriptionType(subscriptionType); +} else if (functionConfig.getSubscriptionType() == null) { Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface
jerrypeng commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185092312 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -271,9 +277,13 @@ void processArguments() throws Exception { } if (null != processingGuarantees) { functionConfig.setProcessingGuarantees(processingGuarantees); +} else if (functionConfig.getProcessingGuarantees() == null) { Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1681: additional refactoring to use source interface
srkukarni commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185090654 ## File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java ## @@ -104,15 +99,18 @@ @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; -@Parameter(names = "--subscription_type", description = "What subscription type to use") -protected FunctionDetails.SubscriptionType subscriptionType; - -@Parameter(names = "--source_classname", description = "The source classname") +@Parameter(names = "--source_classname", description = "The source classname", required = true) protected String sourceClassname; -@Parameter(names = "--source_configs", description = "The source classname") +@Parameter(names = "--source_configs", description = "The source configs") protected String sourceConfigs; +@Parameter(names = "--source_subscription_type", description = "The source configs", required = true) Review comment: description needs to be changed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1681: additional refactoring to use source interface
srkukarni commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185090684 ## File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java ## @@ -104,15 +99,18 @@ @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; -@Parameter(names = "--subscription_type", description = "What subscription type to use") -protected FunctionDetails.SubscriptionType subscriptionType; - -@Parameter(names = "--source_classname", description = "The source classname") +@Parameter(names = "--source_classname", description = "The source classname", required = true) protected String sourceClassname; -@Parameter(names = "--source_configs", description = "The source classname") +@Parameter(names = "--source_configs", description = "The source configs") protected String sourceConfigs; +@Parameter(names = "--source_subscription_type", description = "The source configs", required = true) +protected String sourceSubscriptionType; + +@Parameter(names = "--source_topics_serde_classname", description = "The source configs", required = true) Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1681: additional refactoring to use source interface
srkukarni commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185085957 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -271,9 +277,13 @@ void processArguments() throws Exception { } if (null != processingGuarantees) { functionConfig.setProcessingGuarantees(processingGuarantees); +} else if (functionConfig.getProcessingGuarantees() == null) { Review comment: this can be removed since proto3's conventions have a default value for enums This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins opened a new pull request #1686: Protobuf documentation update
lucperkins opened a new pull request #1686: Protobuf documentation update URL: https://github.com/apache/incubator-pulsar/pull/1686 This PR changes the website build setup to include an easier installation of the `protoc-gen-doc` tool. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1681: additional refactoring to use source interface
srkukarni commented on a change in pull request #1681: additional refactoring to use source interface URL: https://github.com/apache/incubator-pulsar/pull/1681#discussion_r185085970 ## File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java ## @@ -271,9 +277,13 @@ void processArguments() throws Exception { } if (null != processingGuarantees) { functionConfig.setProcessingGuarantees(processingGuarantees); +} else if (functionConfig.getProcessingGuarantees() == null) { + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } if (null != subscriptionType) { functionConfig.setSubscriptionType(subscriptionType); +} else if (functionConfig.getSubscriptionType() == null) { Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1684: Update REST API docs
merlimat commented on a change in pull request #1684: Update REST API docs URL: https://github.com/apache/incubator-pulsar/pull/1684#discussion_r185081707 ## File path: site/_data/admin-rest-api-swagger.json ## @@ -247,6 +247,26 @@ } } }, +"/broker-stats/topics" : { Review comment: Nice! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1684: Update REST API docs
lucperkins commented on a change in pull request #1684: Update REST API docs URL: https://github.com/apache/incubator-pulsar/pull/1684#discussion_r185081053 ## File path: site/_data/admin-rest-api-swagger.json ## @@ -247,6 +247,26 @@ } } }, +"/broker-stats/topics" : { Review comment: @merlimat I've updated the site generation script to include Swagger definition generation and removed the generated JSON from Git. If Jenkins uses `make publish` to build the site then you shouldn't need to update the Jenkins config. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1684: Update REST API docs
lucperkins commented on a change in pull request #1684: Update REST API docs URL: https://github.com/apache/incubator-pulsar/pull/1684#discussion_r185081053 ## File path: site/_data/admin-rest-api-swagger.json ## @@ -247,6 +247,26 @@ } } }, +"/broker-stats/topics" : { Review comment: @merlimat I've updated the site generation script to include Swagger definition generation and removed the generated JSON from Git This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1656: Add admin api to delete topic forcefully
merlimat commented on issue #1656: Add admin api to delete topic forcefully URL: https://github.com/apache/incubator-pulsar/pull/1656#issuecomment-385497790 @rdhabalia Last week I pushed changes to have `pulsar-admin topics` subcommand. Can you add the `--force` flag (maybe even with `-f` alias) to `CmdTopics` as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1684: Update REST API docs
merlimat commented on a change in pull request #1684: Update REST API docs URL: https://github.com/apache/incubator-pulsar/pull/1684#discussion_r185078995 ## File path: site/_data/admin-rest-api-swagger.json ## @@ -247,6 +247,26 @@ } } }, +"/broker-stats/topics" : { Review comment: @lucperkins can we hook the swagger update in the website build and delete this file (which is bound to always be outdated) from the git repo? I can update the Jenkins job config if needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1683: Introduced TypedMessageBuilder
lucperkins commented on a change in pull request #1683: Introduced TypedMessageBuilder URL: https://github.com/apache/incubator-pulsar/pull/1683#discussion_r185072449 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java ## @@ -177,21 +177,18 @@ public void testKeyBasedProducer() throws Exception { Consumerconsumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); -Message msg = null; for (int i = 0; i < 5; i++) { String message = "my-message-" + i; -msg = MessageBuilder.create().setContent(message.getBytes()).setKey(dummyKey1).build(); -producer.send(msg); + producer.newMessage().key(dummyKey1).value(message.getBytes()).send(); } for (int i = 5; i < 10; i++) { String message = "my-message-" + i; -msg = MessageBuilder.create().setContent(message.getBytes()).setKey(dummyKey2).build(); -producer.send(msg); + producer.newMessage().key(dummyKey2).value(message.getBytes()).send(); } Set messageSet = Sets.newHashSet(); for (int i = 0; i < 10; i++) { -msg = consumer.receive(5, TimeUnit.SECONDS); +Message msg = consumer.receive(5, TimeUnit.SECONDS); Review comment: Syntax issue (` msg`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1553: Pulsar 2.0 docs
merlimat commented on a change in pull request #1553: Pulsar 2.0 docs URL: https://github.com/apache/incubator-pulsar/pull/1553#discussion_r185071798 ## File path: site/_config.yml ## @@ -25,9 +25,10 @@ pulsar_repo: https://github.com/apache/incubator-pulsar/tree/master baseurl: / destination: ../generated-site/content -preview_version_id: 20180426.125800-32 -current_version: 1.22.0-incubating +preview_version_id: 20180330.125917-6 +current_version: "2.0.0" Review comment: @lucperkins I think some of my previous comments got buried up earlier. The moment we change current version, it will change the download links as well. This is done once the release is finally approved. I think we should leave 1.22 here, and merge this docs changes. We'll change the current version as part of release process. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #1685: Fixed NPE and added test for Schema.STRING
merlimat opened a new pull request #1685: Fixed NPE and added test for Schema.STRING URL: https://github.com/apache/incubator-pulsar/pull/1685 ### Motivation Fixed NullPointerException when using `StringSchema` ### Modifications * Added `StringSchema` as `Schema.STRING` * Added String in Schema type enumerators * Removed types not yet implemented * Added simple test using String schema cc/ @mgodave @lucperkins This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins opened a new pull request #1684: Update REST API docs
lucperkins opened a new pull request #1684: Update REST API docs URL: https://github.com/apache/incubator-pulsar/pull/1684 At the moment, updating the REST API docs via Swagger is a manual process. This PR is the first update in several months (in light of some important changes). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1482: Pulsar Functions for Python docs
lucperkins commented on issue #1482: Pulsar Functions for Python docs URL: https://github.com/apache/incubator-pulsar/pull/1482#issuecomment-385480279 @srkukarni Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1639: Rest API for Ledger Offloading
ivankelly commented on issue #1639: Rest API for Ledger Offloading URL: https://github.com/apache/incubator-pulsar/pull/1639#issuecomment-385476742 retest this please // PersistentFailoverE2ETest.testSimpleConsumerEventsWithoutPartition This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1677: Add tenant and namespace getters to Python context
lucperkins commented on a change in pull request #1677: Add tenant and namespace getters to Python context URL: https://github.com/apache/incubator-pulsar/pull/1677#discussion_r185055849 ## File path: pulsar-client-cpp/python/functions/context.py ## @@ -50,9 +50,18 @@ def get_message_id(self): pass @abstractmethod - def get_topic_name(self): + def get_input_topic_name(self): Review comment: Good idea. I'll update. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1677: Add tenant and namespace getters to Python context
srkukarni commented on a change in pull request #1677: Add tenant and namespace getters to Python context URL: https://github.com/apache/incubator-pulsar/pull/1677#discussion_r185055226 ## File path: pulsar-client-cpp/python/functions/context.py ## @@ -50,9 +50,18 @@ def get_message_id(self): pass @abstractmethod - def get_topic_name(self): + def get_input_topic_name(self): Review comment: Actually the method applies to the message that is currently being processed. So maybe we should update that to get_current_msg_topic_name or something equivalent? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srkukarni commented on a change in pull request #1482: Pulsar Functions for Python docs
srkukarni commented on a change in pull request #1482: Pulsar Functions for Python docs URL: https://github.com/apache/incubator-pulsar/pull/1482#discussion_r185053772 ## File path: site/docs/latest/functions/api.md ## @@ -474,4 +495,205 @@ public class MetricRecorderFunction implements Function{ ## Pulsar Functions for Python {#python} -Documentation for the Python SDK for Pulsar Functions is coming soon. \ No newline at end of file +Writing Pulsar Functions in Python entails implementing one of two things: + +* A `process` function that takes an input (message data from the function's input topic(s)), applies some kind of logic to it, and either returns an object (to be published to the function's output topic) or `pass`es and thus doesn't produce a message +* A `Function` class that has a `process` method that provides a message input to process and a [context](#python-context) object + +### Getting started + +The requirements for writing Pulsar Functions in Python depend on your [deployment mode](../deployment): + +* If you're writing a [Python native function](#python-native), you won't need to install any external dependencies +* If you're writing a [Python SDK function](#python-sdk), you'll need to install the the [`pulsar-client`](/api/python) Python library. + + ```bash + $ pip install pulsar-client=={{ site.python_latest }} + ``` + +### Packaging + +At the moment, the code for Pulsar Functions written in Python must be contained within a single Python file. In the future, Pulsar Functions may support other packaging formats, such as [**P**ython **EX**ecutables](https://github.com/pantsbuild/pex) (PEXes). + +### Python native functions {#python-native} + +If your function doesn't require access to its [context](#context), you can create a Pulsar Function by implementing a `process` function, which provides a single input object that you can process however you wish. Here's an example function that takes a string as its input, adds an exclamation point at the end of the string, and then publishes the resulting string: + +```python +def process(input): +return "{0}!".format(input) +``` + +In general, you should use native functions when you don't need access to the function's [context](#context). If you *do* need access to the function's context, then we recommend using the [Pulsar Functions Python SDK](#python-sdk). + + Python native examples + +There is one example Python native function in [this folder](https://github.com/apache/incubator-pulsar/tree/master/pulsar-functions/python-examples): + +* [`pure_python_function_exclamation.py`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/python-examples/pure_python_function_exclamation.py) + +### Python SDK functions {#python-sdk} + +To get started developing Pulsar Functions using the Python SDK, you'll need to install Review comment: broken sentence? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins closed pull request #1487: Pulsar Functions counters example (WIP)
lucperkins closed pull request #1487: Pulsar Functions counters example (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1487 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java index 9bc25db923..22a24d2999 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java @@ -22,12 +22,13 @@ import org.apache.pulsar.functions.api.Function; import java.util.Arrays; +import java.util.List; public class CounterFunction implements Function{ @Override -public Void process(String input, Context context) throws Exception { -Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1)); - +public Void process(String input, Context context) { +List words = Arrays.asList(input.split("\\.")); +words.forEach(word -> context.incrCounter(word, 1)); return null; } } diff --git a/site/docs/latest/functions/overview.md b/site/docs/latest/functions/overview.md index c3cf23d65d..895c0b8ff1 100644 --- a/site/docs/latest/functions/overview.md +++ b/site/docs/latest/functions/overview.md @@ -183,7 +183,7 @@ You can also mix and match configuration methods by specifying some function att ## Supported languages -Pulsar Functions can currently be written in [Java](../../functions/api#java) and [Python](../../functions/api#python). Support for additional languages is coming soon. +Pulsar Functions can be configured in two ways: ## The Pulsar Functions API {#api} @@ -345,6 +345,118 @@ public class ConfigMapFunction implements Function { } ``` +## Deployment modes + +The Pulsar Functions feature was built to support a variety of deployment options. At the moment, there are two ways to run Pulsar Functions: + +Deployment mode | Description +:---|:--- +Local run mode | The function runs in your local environment, for example on your laptop +Cluster mode | The function runs *inside of* your Pulsar cluster, on the same machines as your Pulsar {% popover brokers %} + +### Local run mode {#local-run} + +If you run a Pulsar Function in **local run** mode, it will run on the machine from which the command is run (this could be your laptop, an [AWS EC2](https://aws.amazon.com/ec2/) instance, etc.). Here's an example [`localrun`](../../CliTools#pulsar-admin-functions-localrun) command: + +```bash +$ bin/pulsar-admin functions localrun \ + --py myfunc.py \ + --className myfunc.SomeFunction \ + --inputs persistent://sample/standalone/ns1/input-1 \ + --output persistent://sample/standalone/ns1/output-1 +``` + +By default, the function will connect to a Pulsar cluster running on the same machine, via a local {% popover broker %} service URL of `pulsar://localhost:6650`. If you'd like to use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the `--brokerServiceUrl` flag. Here's an example: + +```bash +$ bin/pulsar-admin functions localrun \ + --brokerServiceUrl pulsar://my-cluster-host:6650 \ + # Other function parameters +``` + +### Cluster run mode {#cluster-run} + +When you run a Pulsar Function in **cluster mode**, the function code will be uploaded to a Pulsar {% popover broker %} and run *alongside the broker* rather than in your [local environment](#local-run). You can run a function in cluster mode using the [`create`](../../CliTools#pulsar-admin-functions-create) command. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --py myfunc.py \ + --className myfunc.SomeFunction \ + --inputs persistent://sample/standalone/ns1/input-1 \ + --output persistent://sample/standalone/ns1/output-1 +``` + +This command will upload `myfunc.py` to Pulsar, which will use the code to start one [or more](#parallelism) instances of the function. + +### Parallelism + +By default, only one **instance** of a Pulsar Function runs when you create and run it in [cluster run mode](#cluster-run). You can also, however, run multiple instances in parallel. You can specify the number of instances when you create the function, or update an existing single-instance function with a new parallelism factor. + +This command, for example, would create and run a function with a parallelism of 5 (i.e. 5 instances): + +```bash +$ bin/pulsar-admin functions create \ + --name parallel-fun \ +
[GitHub] lucperkins commented on issue #1487: Pulsar Functions counters example (WIP)
lucperkins commented on issue #1487: Pulsar Functions counters example (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1487#issuecomment-385466420 I'm going to close this for now and re-open post 2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1601: Pulsar Functions state storage documentation (WIP)
lucperkins commented on issue #1601: Pulsar Functions state storage documentation (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1601#issuecomment-385466251 I'm going to close this for now and re-open post 2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1352: Delete inactive subscriptions automatically
sijie commented on issue #1352: Delete inactive subscriptions automatically URL: https://github.com/apache/incubator-pulsar/pull/1352#issuecomment-385466352 @merlimat are we going to include this in 2.0? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins closed pull request #1601: Pulsar Functions state storage documentation (WIP)
lucperkins closed pull request #1601: Pulsar Functions state storage documentation (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1601 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml index 68fe4263c4..a28a83af65 100644 --- a/site/_data/sidebar.yaml +++ b/site/_data/sidebar.yaml @@ -44,6 +44,8 @@ groups: endpoint: deployment - title: Processing guarantees endpoint: guarantees + - title: State storage +endpoint: state - title: Metrics for Pulsar Functions endpoint: metrics diff --git a/site/docs/latest/functions/overview.md b/site/docs/latest/functions/overview.md index c3cf23d65d..b795820b29 100644 --- a/site/docs/latest/functions/overview.md +++ b/site/docs/latest/functions/overview.md @@ -397,4 +397,9 @@ Pulsar Functions that use the [Pulsar Functions SDK](#sdk) can publish metrics t ## State storage -Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a state storage interface. All Pulsar installations, including local {% popover standalone %} installations, include a deployment of BookKeeper {% popover bookies %}. \ No newline at end of file +Pulsar Functions are a great choice for performing stateless computations using Pulsar message data. But they can also be used for operations that do require state via the **state storage API**. For a more in-depth guide, see [State storage for Pulsar Functions](../state). + +{% include admonition.html type="info" content="Pulsar Function [counters](#counters) are simply a special case of Pulsar Function state storage." %} + +{% include admonition.html type="success" title="No need for an external database" +content="Pulsar uses the [Apache BookKeeper](../../getting-started/ConceptsAndArchitecture#persistent-storage) log storage system for persistent storage of message data (and for other purposes). Pulsar Functions also use BookKeeper for state storage. This means that get state storage \"for free\" in Pulsar Functions, i.e. you get a fast, strongly consistent, highly available data storage system without needing to deploy your own." %} \ No newline at end of file diff --git a/site/docs/latest/functions/state.md b/site/docs/latest/functions/state.md new file mode 100644 index 00..1187929e51 --- /dev/null +++ b/site/docs/latest/functions/state.md @@ -0,0 +1,74 @@ +--- +title: State storage for Pulsar Functions +preview: true +--- + +Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a state storage interface. All Pulsar installations, including local {% popover standalone %} installations, include a deployment of BookKeeper {% popover bookies %}---thus, state storage comes "out of the box" for Pulsar Functions. + +## API + +The state storage API for Pulsar Functions is very simple and consists of just a handful of operations: + +Operation | Description +:-|: +Get value | Fetches the value associated with the specified key (if any) +Put value | Updates the value associated with a given key +Increment counter | Increments a specific counter specified by key (you can also decrement using negative increments) +Get counter | Fetches the current value associated with a counter (or zero if the counter has never been used) + +## Deployment + +{% include admonition.html type="success" content="For most Pulsar installations, you won't need to specify an alternative state storage service URL. If you're just using Pulsar's built-in Apache BookKeeper storage system, Pulsar Functions running in both [local run](../deployment#local-run) and [cluster mode](../deployment#cluster-mode) can use state storage." %} + +## Java + +The state storage API for Java is provided by the {% javadoc Context client org.apache.pulsar.functions.api.Context %} class. Here's an example usage of the context object for state storage and retrieval: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class StateFunction implements Function{ +private static final String VALUE = "some value"; + +public String process(String key, Context context) { +Logger LOG = context.getLogger(); + +// Fetch the value associated with the same key +CompletableFuture futureValue = context.getValue(key); + +// Extract the fetched value and return it as the function's output +futureValue.thenAccept(value -> { +if (value.isPresent()) { +String fetchedvalue = new String(value.get().array()); +
[GitHub] sijie commented on issue #1669: PIP-17: provide DataBlockHeader and implementation
sijie commented on issue #1669: PIP-17: provide DataBlockHeader and implementation URL: https://github.com/apache/incubator-pulsar/pull/1669#issuecomment-385466192 moved this to 2.1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on a change in pull request #1677: Add tenant and namespace getters to Python context
lucperkins commented on a change in pull request #1677: Add tenant and namespace getters to Python context URL: https://github.com/apache/incubator-pulsar/pull/1677#discussion_r185049587 ## File path: pulsar-client-cpp/python/functions/context.py ## @@ -50,9 +50,18 @@ def get_message_id(self): pass @abstractmethod - def get_topic_name(self): + def get_input_topic_name(self): Review comment: I updated the corresponding Java method as well. I think that `getTopicName`/`get_topic_name` is slightly confusing given that there are so many topics in play. Happy to revert if you disagree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1656: Add admin api to delete topic forcefully
sijie commented on issue #1656: Add admin api to delete topic forcefully URL: https://github.com/apache/incubator-pulsar/pull/1656#issuecomment-385443933 @merlimat : @rdhabalia has addressed the conflicts. can you review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fix DefaultSchemasTest (#1682)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 42c2f37 Fix DefaultSchemasTest (#1682) 42c2f37 is described below commit 42c2f3712f24473845edc059242c629533c1596c Author: Sijie GuoAuthorDate: Mon Apr 30 08:29:32 2018 -0700 Fix DefaultSchemasTest (#1682) *Motivation* #1585 introduced default schemas test. it wasn't rebased to latest master. so the PR was merged without problem but fail the build *Solution* Fix the test --- .../test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java index e565efb..ac54ef3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/DefaultSchemasTest.java @@ -71,7 +71,7 @@ public class DefaultSchemasTest { } @Test -public void testStringSchema() { +public void testStringSchema() throws Exception { String testString = "hello world"; byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8); StringSchema stringSchema = new StringSchema(); -- To stop receiving notification emails like this one, please contact si...@apache.org.