[incubator-pulsar] branch master updated: add auto ack and timeout configurable (#2503)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8e14175 add auto ack and timeout configurable (#2503) 8e14175 is described below commit 8e141752cbbbd038ecc183fd63b72ef19dfb32cc Author: Rajan Dhabalia AuthorDate: Tue Sep 18 21:19:05 2018 -0700 add auto ack and timeout configurable (#2503) * add auto ack and timeout configurable * Fix test --- .../java/org/apache/pulsar/admin/cli/CmdSinks.java| 19 ++- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 1 + .../org/apache/pulsar/functions/utils/SinkConfig.java | 3 +++ site2/docs/reference-pulsar-admin.md | 4 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 38a55bf..8f9eefe 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -291,6 +291,10 @@ public class CmdSinks extends CmdBase { protected String DEPRECATED_sinkConfigString; @Parameter(names = "--sink-config", description = "User defined configs key/values") protected String sinkConfigString; +@Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) +protected boolean autoAck = true; +@Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") +protected Long timeoutMs; protected SinkConfig sinkConfig; @@ -399,6 +403,15 @@ public class CmdSinks extends CmdBase { sinkConfig.setConfigs(parseConfigs(sinkConfigString)); } +sinkConfig.setAutoAck(autoAck); +if (timeoutMs != null) { +sinkConfig.setTimeoutMs(timeoutMs); +} + +if (null != sinkConfigString) { +sinkConfig.setConfigs(parseConfigs(sinkConfigString)); +} + inferMissingArguments(sinkConfig); } @@ -585,7 +598,11 @@ public class CmdSinks extends CmdBase { : SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); -functionDetailsBuilder.setAutoAck(true); +functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); +if (sinkConfig.getTimeoutMs() != null) { +sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); +} + functionDetailsBuilder.setSource(sourceSpecBuilder); // set up sink spec diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index e3ba70e..fb13d39 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -133,6 +133,7 @@ public class TestCmdSinks { sinkConfig.setTenant(TENANT); sinkConfig.setNamespace(NAMESPACE); sinkConfig.setName(NAME); +sinkConfig.setAutoAck(true); sinkConfig.setInputs(INPUTS_LIST); sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index be886c4..1132fa6 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -78,6 +78,9 @@ public class SinkConfig { private boolean retainOrdering; @isValidResources private Resources resources; +private boolean autoAck; +@isPositiveNumber +private Long timeoutMs; @isFileExists private String archive; diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 5b1fb41..42f6c9c 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1014,6 +1014,8 @@ Options |`--sink-type`|The built-in sinks's connector provider|| |`--topics-pattern`|TopicsPattern to consume from list of topics under a namespace that match the pattern.|| |`--tenant`|The sink’s tenant|| +|`--auto-ack`|Let the functions framework manage acking|| +|`--timeout-ms`|The message timeout in milliseconds|| ### `update` @@ -10
[incubator-pulsar] branch master updated: [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 49fc5e5 [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549) 49fc5e5 is described below commit 49fc5e508a996cfe59949effbcaf0abfa46028ce Author: Rajan Dhabalia AuthorDate: Tue Sep 18 15:22:12 2018 -0700 [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549) Fix: Compaction with last deleted keys not completing compaction Delete assignment with empty payload --- conf/functions_worker.yml | 2 + .../pulsar/compaction/TwoPhaseCompactor.java | 2 +- .../apache/pulsar/compaction/CompactionTest.java | 2 +- .../worker/PulsarWorkerAssignmentTest.java | 370 + .../proto/src/main/proto/Request.proto | 5 - .../functions/worker/FunctionAssignmentTailer.java | 47 ++- .../functions/worker/FunctionRuntimeManager.java | 283 +++- .../pulsar/functions/worker/SchedulerManager.java | 126 --- .../pulsar/functions/worker/WorkerConfig.java | 5 +- .../pulsar/functions/worker/WorkerService.java | 15 +- .../worker/scheduler/RoundRobinScheduler.java | 10 +- .../worker/FunctionRuntimeManagerTest.java | 40 +-- .../functions/worker/MembershipManagerTest.java| 3 + .../functions/worker/SchedulerManagerTest.java | 298 + 14 files changed, 791 insertions(+), 417 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 444b7fb..0c7b8af 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -44,6 +44,8 @@ rescheduleTimeoutMs: 6 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 3 +# Frequency how often worker performs compaction on function-topics +topicCompactionFrequencySec: 1800 metricsSamplingPeriodSec: 60 # Enforce authentication authenticationEnabled: false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 425e049..612b336 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -359,4 +359,4 @@ public class TwoPhaseCompactor extends Compactor { this.latestForKey = latestForKey; } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 32c93b4..8e7b292 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1268,4 +1268,4 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java new file mode 100644 index 000..fe28a51 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java
[incubator-pulsar] branch master updated: Add ledger op timeout to avoid topics stuck on ledger-creation (#2535)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d5e88c1 Add ledger op timeout to avoid topics stuck on ledger-creation (#2535) d5e88c1 is described below commit d5e88c1ec16df557655e42c9f648a2fd3343d759 Author: Rajan Dhabalia AuthorDate: Sun Sep 16 21:55:21 2018 -0700 Add ledger op timeout to avoid topics stuck on ledger-creation (#2535) * Add ledger op timeout to avoid topics stuck on ledger-creation * rename to metadataOperationsTimeoutSeconds * ad service config for managedLedgerMetadataOperationsTimeoutSeconds --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../bookkeeper/mledger/ManagedLedgerConfig.java| 21 +++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 101 +++--- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 ++--- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 47 ++- .../apache/pulsar/broker/ServiceConfiguration.java | 10 ++ .../pulsar/broker/service/BrokerService.java | 2 + 8 files changed, 234 insertions(+), 103 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 8927a85..2277cc8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 # corrupted at bookkeeper and managed-cursor is stuck at that ledger. autoSkipNonRecoverableData=false +# operation timeout while updating managed-ledger metadata. +managedLedgerMetadataOperationsTimeoutSeconds=60 + ### --- Load balancer --- ### # Enable load balancer diff --git a/conf/standalone.conf b/conf/standalone.conf index a68664c..cc8f564 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 # corrupted at bookkeeper and managed-cursor is stuck at that ledger. autoSkipNonRecoverableData=false +# operation timeout while updating managed-ledger metadata. +managedLedgerMetadataOperationsTimeoutSeconds=60 + ### --- Load balancer --- ### loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 698d245..5967453 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -56,6 +56,7 @@ public class ManagedLedgerConfig { private boolean autoSkipNonRecoverableData; private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); private long offloadAutoTriggerSizeThresholdBytes = -1; +private long metadataOperationsTimeoutSeconds = 60; private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -511,4 +512,24 @@ public class ManagedLedgerConfig { this.clock = clock; return this; } + +/** + * + * Ledger-Op (Create/Delete) timeout + * + * @return + */ +public long getMetadataOperationsTimeoutSeconds() { +return metadataOperationsTimeoutSeconds; +} + +/** + * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure + * + * @param metadataOperationsTimeoutSeconds + */ +public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) { +this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds; +return this; +} } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 50b4722..0ac818f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1974,68 +1974,73 @@ public class ManagedCursorImpl implements ManagedCursor { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); -bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), -config.getMetadataAckQuorumSize(), digestType, config.getPassword(), (rc, lh, ctx) -> { -ledger.getExecutor().execute(safeRun(() -> { -ledger.mbean.endCursorLedgerCreateOp(); -if (rc != BKException.Code.OK) { -log.warn("[{}] Error creat
[incubator-pulsar] branch master updated: Delete temp file after submitting function (#2519)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ec9dfa8 Delete temp file after submitting function (#2519) ec9dfa8 is described below commit ec9dfa8e2ec0af29f6a29d1ce5817d245164c533 Author: Rajan Dhabalia AuthorDate: Wed Sep 5 10:37:52 2018 -0700 Delete temp file after submitting function (#2519) --- .../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java| 3 +-- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java| 3 +-- .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java | 3 +-- .../main/java/org/apache/pulsar/functions/worker/Utils.java| 10 +- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 99ef4dd..777ec93 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -501,8 +501,7 @@ public class CmdFunctions extends CmdBase { // download jar file if url is http or file is downloadable File tempPkgFile = null; try { -tempPkgFile = File.createTempFile(functionConfig.getName(), "function"); -downloadFromHttpUrl(functionConfig.getJar(), new FileOutputStream(tempPkgFile)); +tempPkgFile = downloadFromHttpUrl(functionConfig.getJar(), functionConfig.getName()); jarFilePath = tempPkgFile.getAbsolutePath(); } catch (Exception e) { if (tempPkgFile != null) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 3e49600..38a55bf 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -431,8 +431,7 @@ public class CmdSinks extends CmdBase { if(sinkConfig.getArchive().startsWith(Utils.HTTP)) { File tempPkgFile = null; try { -tempPkgFile = File.createTempFile(sinkConfig.getName(), "sink"); -downloadFromHttpUrl(sinkConfig.getArchive(), new FileOutputStream(tempPkgFile)); +tempPkgFile = downloadFromHttpUrl(sinkConfig.getArchive(), sinkConfig.getName()); archivePath = tempPkgFile.getAbsolutePath(); } catch(Exception e) { if(tempPkgFile!=null ) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index db00b9f..fb32a6a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -388,8 +388,7 @@ public class CmdSources extends CmdBase { if(sourceConfig.getArchive().startsWith(Utils.HTTP)) { File tempPkgFile = null; try { -tempPkgFile = File.createTempFile(sourceConfig.getName(), "source"); -downloadFromHttpUrl(sourceConfig.getArchive(), new FileOutputStream(tempPkgFile)); +tempPkgFile = downloadFromHttpUrl(sourceConfig.getArchive(), sourceConfig.getName()); archivePath = tempPkgFile.getAbsolutePath(); } catch(Exception e) { if(tempPkgFile!=null ) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index ec99b00..6487dcf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -51,6 +51,7 @@ import org.apache.pulsar.functions.worker.dlog.DLOutputStream; import org.apache.zookeeper.KeeperException.Code; import org.apache.pulsar.functions.proto.Function; import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.functions.worker.Utils.downloadFromHttpUrl; @Slf4j public final class Utils { @@ -159,7 +160,14 @@ public final class Utils { ReadableByteChannel rbc = Channels.newChannel(website.openStream()); outputStream.ge
[incubator-pulsar] branch master updated: Log and Return error-message on function-stats failure (#2517)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2ae20e2 Log and Return error-message on function-stats failure (#2517) 2ae20e2 is described below commit 2ae20e2f55ccacff1a6201986a5c3f2e7a0ca6bc Author: Rajan Dhabalia AuthorDate: Wed Sep 5 10:37:30 2018 -0700 Log and Return error-message on function-stats failure (#2517) --- .../apache/pulsar/functions/worker/rest/api/FunctionsImpl.java| 8 ++-- 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index e9c1276..136bab0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -360,12 +360,8 @@ public class FunctionsImpl { functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, Integer.parseInt(instanceId)); } catch (Exception e) { -log.error("Got Exception Getting Status", e); -FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); -functionStatusBuilder.setRunning(false); -String functionDetailsJson = org.apache.pulsar.functions.utils.Utils -.printJson(functionStatusBuilder.build()); -return Response.status(Status.OK).entity(functionDetailsJson).build(); +log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e); +return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); } String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatus);
[incubator-pulsar] branch master updated: Add CLI to get function status of a specific instance (#2518)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 10ce8ec Add CLI to get function status of a specific instance (#2518) 10ce8ec is described below commit 10ce8ec603c64581f086497cd8192538ba6cdd12 Author: Rajan Dhabalia AuthorDate: Wed Sep 5 10:37:14 2018 -0700 Add CLI to get function status of a specific instance (#2518) * Add CLI to get function status of a specific instance * fix condition and test --- .../org/apache/pulsar/client/admin/Functions.java| 20 +++- .../pulsar/client/admin/internal/FunctionsImpl.java | 20 .../apache/pulsar/admin/cli/CmdFunctionsTest.java| 16 .../org/apache/pulsar/admin/cli/CmdFunctions.java| 9 - site2/docs/reference-pulsar-admin.md | 2 +- 5 files changed, 64 insertions(+), 3 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 96e9cbc..9a18460 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.functions.worker.WorkerInfo; @@ -181,7 +182,24 @@ public interface Functions { * Unexpected error */ FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException; - + +/** + * Gets the current status of a function instance. + * + * @param tenant + *Tenant name + * @param namespace + *Namespace name + * @param function + *Function name + * @param id + *Function instance-id + * @return + * @throws PulsarAdminException + */ +FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) +throws PulsarAdminException; + /** * Restart function instance * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 4552eba..7dc7050 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; import org.apache.pulsar.functions.worker.WorkerInfo; import org.glassfish.jersey.media.multipart.FormDataBodyPart; @@ -110,6 +111,25 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override +public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) +throws PulsarAdminException { +try { +Response response = request( + functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) +.get(); +if (!response.getStatusInfo().equals(Response.Status.OK)) { +throw new ClientErrorException(response); +} +String jsonResponse = response.readEntity(String.class); +FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); +mergeJson(jsonResponse, functionStatusBuilder); +return functionStatusBuilder.build(); +} catch (Exception e) { +throw getApiException(e); +} +} + +@Override public void createFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDat
[incubator-pulsar] branch master updated: Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 143979d Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498) 143979d is described below commit 143979d15525520edeaededd6b7878b61f5c8bfe Author: Rajan Dhabalia AuthorDate: Fri Aug 31 15:49:09 2018 -0700 Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498) --- .../pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0fefe9c..9a87d26 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -57,6 +57,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * Latest cumulative ack sent to broker */ private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest; +private volatile boolean cumulativeAckFulshRequired = false; private static final AtomicReferenceFieldUpdater LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck"); @@ -119,6 +120,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (msgId.compareTo(lastCumlativeAck) > 0) { if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId)) { // Successfully updated the last cumlative ack. Next flush iteration will send this to broker. +cumulativeAckFulshRequired = true; return; } } else { @@ -160,10 +162,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return; } -if (!lastCumulativeAck.equals(MessageId.earliest)) { +if (cumulativeAckFulshRequired) { ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, AckType.Cumulative, null, Collections.emptyMap()); cnx.ctx().write(cmd, cnx.ctx().voidPromise()); +cumulativeAckFulshRequired = false; } // Flush all individual acks
[incubator-pulsar] branch master updated: add sub-name option to function cli (#2492)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 630fc0d add sub-name option to function cli (#2492) 630fc0d is described below commit 630fc0db6d4fb0af43675c8d9f6f8c93fb209552 Author: Rajan Dhabalia AuthorDate: Fri Aug 31 15:09:18 2018 -0700 add sub-name option to function cli (#2492) * add sub-name option to function cli * add docs --- .../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 11 +++ .../org/apache/pulsar/functions/utils/FunctionConfig.java | 1 + site2/docs/reference-pulsar-admin.md | 3 +++ 3 files changed, 15 insertions(+) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index e567814..7f2f521 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -24,6 +24,7 @@ import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.fileExists; @@ -280,6 +281,8 @@ public class CmdFunctions extends CmdBase { protected Boolean DEPRECATED_retainOrdering; @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; +@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") +protected String subsName; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") @@ -406,6 +409,10 @@ public class CmdFunctions extends CmdBase { } functionConfig.setRetainOrdering(retainOrdering); + +if (isNotBlank(subsName)) { +functionConfig.setSubName(subsName); +} if (null != userConfigString) { Type type = new TypeToken>(){}.getType(); @@ -666,6 +673,10 @@ public class CmdFunctions extends CmdBase { ? SubscriptionType.FAILOVER : SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); + +if (isNotBlank(functionConfig.getSubName())) { + sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); +} if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 21e6112..1335f8c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -107,6 +107,7 @@ public class FunctionConfig { private Map userConfig; private Runtime runtime; private boolean autoAck; +private String subName; @isPositiveNumber private int parallelism; @isValidResources diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index eddee17..03e1472 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -307,6 +307,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--broker-service-url `|The URL of the Pulsar broker|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| @@ -352,6 +353,7 @@ Options |`--
[incubator-pulsar] branch master updated: Race condition function-runtime-manager read old assignments (#2437)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 672a167 Race condition function-runtime-manager read old assignments (#2437) 672a167 is described below commit 672a167a31c719dab7623770d39666e8d7e7a0fd Author: Rajan Dhabalia AuthorDate: Thu Aug 23 18:34:44 2018 -0700 Race condition function-runtime-manager read old assignments (#2437) --- .../apache/pulsar/client/impl/ConsumerBase.java| 4 .../pulsar/functions/worker/MembershipManager.java | 26 ++ .../pulsar/functions/worker/WorkerService.java | 2 +- .../functions/worker/MembershipManagerTest.java| 24 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 016324e..a6fda6f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -335,6 +335,10 @@ public abstract class ConsumerBase extends HandlerState implements Consumer consumer; +private final ConsumerImpl consumer; private final WorkerConfig workerConfig; private PulsarAdmin pulsarAdminClient; private final CompletableFuture firstConsumerEventFuture; @@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { @VisibleForTesting Map unsignedFunctionDurations = new HashMap<>(); -MembershipManager(WorkerConfig workerConfig, PulsarClient client) +MembershipManager(WorkerService service, PulsarClient client) throws PulsarClientException { -this.workerConfig = workerConfig; +this.workerConfig = service.getWorkerConfig(); consumerName = String.format( "%s:%s:%d", workerConfig.getWorkerId(), @@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { // we don't produce any messages into this topic, we only use the `failover` subscription // to elect an active consumer as the leader worker. The leader worker will be responsible // for scheduling snapshots for FMT and doing task assignment. -consumer = client.newConsumer() +consumer = (ConsumerImpl) client.newConsumer() .topic(workerConfig.getClusterCoordinationTopic()) .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) .subscriptionType(SubscriptionType.Failover) .consumerEventListener(this) .property(WORKER_IDENTIFIER, consumerName) .subscribe(); + +isLeader.set(checkLeader(service, consumer.getConsumerName())); } @Override @@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { return this.pulsarAdminClient; } +private boolean checkLeader(WorkerService service, String consumerName) { +try { +TopicStats stats = service.getBrokerAdmin().topics() + .getStats(service.getWorkerConfig().getClusterCoordinationTopic()); +String activeConsumerName = stats != null +&& stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null +? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName +: null; +return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName); +} catch (Exception e) { +log.warn("Failed to check leader {}", e.getMessage()); +} +return false; +} + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0850766..7fc0cc9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -137,7 +137,7 @@ public class WorkerService { this.connectorsManager = new ConnectorsManager(workerConfig); //create membership manager -this.membershipManager = new MembershipManager(this.workerConfig, this.client); +this.membershipManager = new MembershipManager(this, this.client); // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/funct
[incubator-pulsar] branch master updated: Fix: authorization while redirecting function admin call (#2416)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new cc60027 Fix: authorization while redirecting function admin call (#2416) cc60027 is described below commit cc60027c8dd74469d6e0438ef8df8ea06ada6f2d Author: Rajan Dhabalia AuthorDate: Wed Aug 22 10:53:30 2018 -0700 Fix: authorization while redirecting function admin call (#2416) --- .../pulsar/broker/admin/impl/FunctionsBase.java| 4 +- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 2 +- .../functions/worker/FunctionRuntimeManager.java | 45 -- .../pulsar/functions/worker/WorkerConfig.java | 7 +++- .../pulsar/functions/worker/WorkerService.java | 24 +--- .../functions/worker/rest/FunctionApiResource.java | 3 ++ .../functions/worker/rest/api/FunctionsImpl.java | 17 .../worker/rest/api/v2/FunctionApiV2Resource.java | 5 ++- .../worker/FunctionRuntimeManagerTest.java | 18 +++-- .../functions/worker/MembershipManagerTest.java| 19 +++-- 10 files changed, 90 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 4384f50..315027f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -277,7 +277,7 @@ public class FunctionsBase extends AdminResource implements Supplier admins = Sets.newHashSet("superUser"); TenantInfo tenantInfo = new TenantInfo(admins, null); when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 1016171..93828de 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; @@ -45,6 +46,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.Response.Status; import java.util.Collection; @@ -91,15 +93,15 @@ public class FunctionRuntimeManager implements AutoCloseable{ private MembershipManager membershipManager; private final ConnectorsManager connectorsManager; -public FunctionRuntimeManager(WorkerConfig workerConfig, - PulsarClient pulsarClient, - Namespace dlogNamespace, - MembershipManager membershipManager, - ConnectorsManager connectorsManager) throws Exception { +private final PulsarAdmin functionAdmin; + +public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, +MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; this.connectorsManager = connectorsManager; +this.functionAdmin = workerService.getFunctionAdmin(); -Reader reader = pulsarClient.newReader() +Reader reader = workerService.getClient().newReader() .topic(this.workerConfig.getFunctionAssignmentTopic()) .startMessageId(MessageId.earliest) .create(); @@ -327,7 +329,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ } public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId, -boolean restart) throws Exception { +boolean restart, URI uri) throws Exception { Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId); if (assignment == null) { @@ -355,19 +357,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ .entity(new ErrorData(fullFunctionName + " h
[incubator-pulsar] branch master updated: Kinesis-sink consider topic-name as partition-key if record key empty (#2372)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ad5fc83 Kinesis-sink consider topic-name as partition-key if record key empty (#2372) ad5fc83 is described below commit ad5fc8384bc38ff79637c7bd316b41e76f7bc27a Author: Rajan Dhabalia AuthorDate: Tue Aug 14 15:41:45 2018 -0700 Kinesis-sink consider topic-name as partition-key if record key empty (#2372) --- .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 4dda58f..1056f57 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -114,7 +114,7 @@ public class KinesisSink implements Sink { record.getRecordSequence()); throw new IllegalStateException("kinesis queue has publish failure"); } -String partitionedKey = record.getKey().orElse(defaultPartitionedKey); +String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey)); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256
[incubator-pulsar] branch master updated: Add support to restart function (#2365)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7bcd893 Add support to restart function (#2365) 7bcd893 is described below commit 7bcd8934a0a53ab7a62b3c9d77fbdec94ab497d2 Author: Rajan Dhabalia AuthorDate: Mon Aug 13 22:49:11 2018 -0700 Add support to restart function (#2365) * Add support to restart function fix: pulsar function restart * add support to restart all function instances --- .../pulsar/broker/admin/impl/FunctionsBase.java| 25 + .../apache/pulsar/io/PulsarFunctionTlsTest.java| 2 +- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 63 +++- .../org/apache/pulsar/client/admin/Functions.java | 33 +++ .../client/admin/internal/FunctionsImpl.java | 22 + .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 29 ++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 37 ++- .../pulsar/functions/worker/FunctionActioner.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 107 + .../functions/worker/rest/api/FunctionsImpl.java | 66 + .../worker/rest/api/v2/FunctionApiV2Resource.java | 28 ++ 11 files changed, 404 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 564eb18..b8891e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -268,6 +268,31 @@ public class FunctionsBase extends AdminResource implements Supplier typeArg = byte[].class; FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); @@ -446,7 +446,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, -sinkTopic, subscriptionName); +"my.*", sinkTopic, subscriptionName); try { admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); assertTrue(validRoleName); @@ -507,4 +507,57 @@ public class PulsarSinkE2ETest { assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName()); } + +@Test(timeOut = 2) +public void testFunctionRestartApi() throws Exception { + +final String namespacePortion = "io"; +final String replNamespace = tenant + "/" + namespacePortion; +final String sourceTopicName = "restartFunction"; +final String sourceTopic = "persistent://" + replNamespace + "/" + sourceTopicName; +final String sinkTopic = "persistent://" + replNamespace + "/output"; +final String functionName = "PulsarSink-test"; +final String subscriptionName = "test-sub"; +admin.namespaces().createNamespace(replNamespace); +Set clusters = Sets.newHashSet(Lists.newArrayList("use")); +admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + +// create source topic +Producer producer = pulsarClient.newProducer().topic(sourceTopic).create(); + +String jarFilePathUrl = Utils.FILE + ":" ++ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); +FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, +sourceTopicName, sinkTopic, subscriptionName); +admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + +retryStrategically((test) -> { +try { +SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); +return subStats != null && subStats.consumers.size() == 1; +} catch (PulsarAdminException e) { +return false; +} +}, 5, 150); + +SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); +assertEquals(subStats.consumers.size(), 1); + +// it should restart consumer : so, check if consumer came up again after restarting function +admin.functions().restartFunction(tenant, namespacePortion, functionName); + +retryStrategically((test)
[incubator-pulsar] branch master updated: Set correct exception if function runnable fails with Error (#2353)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new c9988cf Set correct exception if function runnable fails with Error (#2353) c9988cf is described below commit c9988cff24aa12e94c2a09cd112ab2bfa0a8860b Author: Rajan Dhabalia AuthorDate: Mon Aug 13 21:20:02 2018 -0700 Set correct exception if function runnable fails with Error (#2353) * Set correct exception if function runnable fails with Error * fix throwable set --- .../org/apache/pulsar/functions/instance/JavaInstanceRunnable.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index aea9d53..fe94b17 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -185,8 +185,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { */ @Override public void run() { +String functionName = null; try { ContextImpl contextImpl = setupContext(); +functionName = String.format("%s-%s", contextImpl.getTenant(), contextImpl.getFunctionName()); javaInstance = setupJavaInstance(contextImpl); if (null != stateTable) { StateContextImpl stateContext = new StateContextImpl(stateTable); @@ -233,8 +235,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } } catch (Throwable t) { -log.error("Uncaught exception in Java Instance", t); -deathException = (Exception) t; +log.error("[{}] Uncaught exception in Java Instance", functionName, t); +deathException = t; return; } finally { log.info("Closing instance");
[incubator-pulsar] branch master updated: Add worker specific system and jvm metrics (#2352)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 10c3511 Add worker specific system and jvm metrics (#2352) 10c3511 is described below commit 10c351134a36457d3734a356b798b9f18b8d8593 Author: Rajan Dhabalia AuthorDate: Fri Aug 10 00:21:55 2018 -0700 Add worker specific system and jvm metrics (#2352) --- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 11 ++- .../broker/loadbalance/impl/LoadManagerShared.java | 4 +-- .../pulsar/broker/stats/MetricsGenerator.java | 4 +-- .../prometheus/PrometheusMetricsGenerator.java | 4 +-- .../stats/BookieClientsStatsGeneratorTest.java | 2 +- .../apache/pulsar/client/admin/WorkerStats.java| 9 ++ .../client/admin/internal/WorkerStatsImpl.java | 18 ++- .../pulsar/admin/cli/CmdFunctionWorkerStats.java | 24 +++--- .../apache/pulsar/common/stats}/JvmMetrics.java| 32 +-- .../org/apache/pulsar/common/stats/Metrics.java| 5 +++ .../pulsar/functions/worker/MetricsGenerator.java | 37 ++ .../pulsar/functions/worker/WorkerService.java | 3 ++ .../functions/worker/rest/api/FunctionsImpl.java | 21 +++- .../functions/worker/rest/api/v2/WorkerStats.java | 17 +++--- 14 files changed, 150 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index 962c483..5a0e4b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin.v2; import java.io.IOException; +import java.util.Collection; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -41,7 +42,15 @@ public class WorkerStats extends FunctionApiResource { @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 503, message = "Worker service is not running") }) -public Response getMetrics() throws IOException { +public Response getStats() throws IOException { return functions.getFunctionsMetrcis(clientAppId()); } + +@GET +@Path("/metrics") +@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") +@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) +public Collection getMetrics() throws Exception { +return functions.getWorkerMetrcis(clientAppId()); +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 80cb31c..6a5575c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -41,7 +41,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.broker.loadbalance.LoadData; -import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -230,7 +230,7 @@ public class LoadManagerShared { systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / MIBI; // Collect JVM direct memory -systemResourceUsage.directMemory.usage = (double) (JvmMetrics.getJvmDirectMemoryUsed() / MIBI); +systemResourceUsage.directMemory.usage = (double) (getJvmDirectMemoryUsed() / MIBI); systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI); return systemResourceUsage; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/MetricsGenerator.java index
[incubator-pulsar] branch master updated: REST and CLI to get function metrics in json for monitoring (#2296)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 20b41de REST and CLI to get function metrics in json for monitoring (#2296) 20b41de is described below commit 20b41de73912ac1f82280d70a291684cb30d8151 Author: Rajan Dhabalia AuthorDate: Wed Aug 8 16:57:22 2018 -0700 REST and CLI to get function metrics in json for monitoring (#2296) * REST and CLI to get function metrics in json for monitoring * add worker-stats end-point --- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 47 ++ .../org/apache/pulsar/client/admin/Functions.java | 1 + .../apache/pulsar/client/admin/PulsarAdmin.java| 11 +++ .../apache/pulsar/client/admin/WorkerStats.java| 35 +++ .../client/admin/internal/FunctionsImpl.java | 2 +- .../client/admin/internal/WorkerStatsImpl.java | 58 .../pulsar/admin/cli/CmdFunctionWorkerStats.java | 79 .../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../apache/pulsar/admin/cli/PulsarAdminTool.java | 1 + .../src/main/proto/InstanceCommunication.proto | 9 ++ .../pulsar/functions/worker/rest/Resources.java| 2 + .../functions/worker/rest/api/FunctionsImpl.java | 103 + .../functions/worker/rest/api/v2/WorkerStats.java | 54 +++ .../org/apache/pulsar/io/kinesis/KinesisSink.java | 4 +- site2/website/scripts/replace.js | 3 +- 15 files changed, 386 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java new file mode 100644 index 000..962c483 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Path("/worker-stats") +public class WorkerStats extends FunctionApiResource { + +@GET +@Path("/functions") +@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 503, message = "Worker service is not running") }) +public Response getMetrics() throws IOException { +return functions.getFunctionsMetrcis(clientAppId()); +} +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 293304f..c04873d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.functions.worker.WorkerInfo; /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-
[incubator-pulsar] branch master updated: Avoid creating output topic on tenant namespace if output-topic not provided (#2261)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 14765b2 Avoid creating output topic on tenant namespace if output-topic not provided (#2261) 14765b2 is described below commit 14765b2c4b452dcc3a0d1a4fd488142fd5111a93 Author: Rajan Dhabalia AuthorDate: Tue Aug 7 01:06:13 2018 -0700 Avoid creating output topic on tenant namespace if output-topic not provided (#2261) * Avoid creating output topic on tenant namespace if output-topic not provided fix test add flag to skip output topic rename skip-output cmd * fix test --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 31 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 24 --- .../functions/instance/JavaInstanceRunnable.java | 23 +- .../pulsar/functions/sink/PulsarSinkDisable.java | 49 ++ .../pulsar/functions/utils/FunctionConfig.java | 2 +- 5 files changed, 100 insertions(+), 29 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index edfb1c4..e206e75 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -76,6 +76,7 @@ import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNull; /** * Unit test of {@link CmdFunctions}. @@ -434,11 +435,12 @@ public class CmdFunctionsTest { } @Test -public void testCreateWithoutOutputTopic() throws Exception { +public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception { String inputTopicName = TEST_NAME + "-input-topic"; cmd.run(new String[] { "create", "--inputs", inputTopicName, +"--skip-output", "--jar", "SomeJar.jar", "--tenant", "sample", "--namespace", "ns1", @@ -446,8 +448,33 @@ public class CmdFunctionsTest { }); CreateFunction creater = cmd.getCreater(); -assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput()); +assertNull(creater.getFunctionConfig().getOutput()); verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString()); + +} + + +@Test +public void testCreateWithoutOutputTopic() throws Exception { + +ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); +consoleOutputCapturer.start(); + +String inputTopicName = TEST_NAME + "-input-topic"; +cmd.run(new String[] { +"create", +"--inputs", inputTopicName, +"--jar", "SomeJar.jar", +"--tenant", "sample", +"--namespace", "ns1", +"--className", DummyFunction.class.getName(), +}); + +CreateFunction creater = cmd.getCreater(); +consoleOutputCapturer.stop(); +String output = consoleOutputCapturer.getStderr(); +assertNull(creater.getFunctionConfig().getOutput()); +assertTrue(output.contains("output topic is not present")); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 23a6cb8..a4152a4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -233,8 +233,10 @@ public class CmdFunctions extends CmdBase { protected String DEPRECATED_topicsPattern; @Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)") protected String topicsPattern; -@Parameter(names = "--output", description = "The function's output topic") +@Parameter(
[incubator-pulsar] branch master updated: kinesis-sink: manage msg ordering for publish callback failure (#2285)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d2f6dd9 kinesis-sink: manage msg ordering for publish callback failure (#2285) d2f6dd9 is described below commit d2f6dd906292078e68376d8d5b995903e58138de Author: Rajan Dhabalia AuthorDate: Thu Aug 2 12:35:48 2018 -0700 kinesis-sink: manage msg ordering for publish callback failure (#2285) --- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 49 ++ .../pulsar/io/kinesis/KinesisSinkConfig.java | 1 + 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index dc70b98..67de21a 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -44,6 +44,7 @@ import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -89,6 +90,12 @@ public class KinesisSink implements Sink { private static final String defaultPartitionedKey = "default"; private static final int maxPartitionedKeyLength = 256; private SinkContext sinkContext; +// +private static final int FALSE = 0; +private static final int TRUE = 1; +private volatile int previousPublishFailed = FALSE; +private static final AtomicIntegerFieldUpdater IS_PUBLISH_FAILED = +AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed"); public static final String ACCESS_KEY_NAME = "accessKey"; public static final String SECRET_KEY_NAME = "secretKey"; @@ -101,6 +108,12 @@ public class KinesisSink implements Sink { @Override public void write(Record record) throws Exception { +// kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering +if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) { +LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName, +record.getRecordSequence()); +throw new IllegalStateException("kinesis queue has publish failure"); +} String partitionedKey = record.getKey().orElse(defaultPartitionedKey); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) @@ -109,7 +122,7 @@ public class KinesisSink implements Sink { ListenableFuture addRecordResult = kinesisProducer.addUserRecord(this.streamName, partitionedKey, data); addCallback(addRecordResult, -ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext), directExecutor()); +ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor()); if (sinkContext != null) { sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1); sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length); @@ -151,6 +164,7 @@ public class KinesisSink implements Sink { this.streamName = kinesisSinkConfig.getAwsKinesisStreamName(); this.kinesisProducer = new KinesisProducer(kinesisConfig); +IS_PUBLISH_FAILED.set(this, FALSE); LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig, ToStringStyle.SHORT_PREFIX_STYLE))); } @@ -167,30 +181,26 @@ public class KinesisSink implements Sink { private static final class ProducerSendCallback implements FutureCallback { private Record resultContext; -private String streamName; private long startTime = 0; private final Handle recyclerHandle; -private SinkContext sinkContext; +private KinesisSink kinesisSink; private ProducerSendCallback(Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } -static ProducerSendCallback create(String streamName, Record resultContext, long startTime, -SinkContext sinkContext) { +static ProducerSendCallback create(KinesisSink kinesisSink, Record resultContext, long startTime) { ProducerSendCallback sendCallback = RECYCLER.get(); sendCallback
[incubator-pulsar] branch master updated: Fix: set subscription-type based on message ordering (#2259)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new c85dc46 Fix: set subscription-type based on message ordering (#2259) c85dc46 is described below commit c85dc46638cbc7822872511c776777897ce1895c Author: Rajan Dhabalia AuthorDate: Thu Aug 2 12:36:04 2018 -0700 Fix: set subscription-type based on message ordering (#2259) * Fix: set subscription-type based on message ordering * set failover sub on EFFECTIVELY_ONCE processing guarantee --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 31 +- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 11 +--- .../pulsar/functions/utils/FunctionConfig.java | 1 + 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 6948cff..9f7405c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -86,6 +86,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.WindowConfig; +import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator; import org.apache.pulsar.functions.windowing.WindowFunctionExecutor; @@ -240,6 +241,8 @@ public class CmdFunctions extends CmdBase { protected FunctionConfig.ProcessingGuarantees processingGuarantees; @Parameter(names = "--userConfig", description = "User-defined config key/values") protected String userConfigString; +@Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order") +protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") @@ -315,6 +318,9 @@ public class CmdFunctions extends CmdBase { if (null != processingGuarantees) { functionConfig.setProcessingGuarantees(processingGuarantees); } + +functionConfig.setRetainOrdering(retainOrdering); + if (null != userConfigString) { Type type = new TypeToken>(){}.getType(); Map userConfigMap = new Gson().fromJson(userConfigString, type); @@ -567,24 +573,13 @@ public class CmdFunctions extends CmdBase { sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern()); } -// Set subscription type based on processing semantics -if (functionConfig.getProcessingGuarantees() != null) { -switch (functionConfig.getProcessingGuarantees()) { -case ATMOST_ONCE: - sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED); -break; -case ATLEAST_ONCE: - sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED); -break; -case EFFECTIVELY_ONCE: - sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER); -break; -default: -throw new RuntimeException("Unknown processing guarantee: " -+ functionConfig.getProcessingGuarantees().name()); -} -} - +// Set subscription type based on ordering and EFFECTIVELY_ONCE semantics +SubscriptionType subType = (functionConfig.isRetainOrdering() +|| ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) +? SubscriptionType.FAILOVER +: SubscriptionType.SHARED; +sourceSpecBuilder.setSubscriptionType(subType); + if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pu
[incubator-pulsar] branch master updated: handle subscription-already-exist exception on partitioned-topic for create-sub admin-api (#2269)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new c7436fe handle subscription-already-exist exception on partitioned-topic for create-sub admin-api (#2269) c7436fe is described below commit c7436fecb7a567a253a84451244b6ea50cf286fe Author: Rajan Dhabalia AuthorDate: Tue Jul 31 11:33:02 2018 -0700 handle subscription-already-exist exception on partitioned-topic for create-sub admin-api (#2269) --- .../broker/admin/impl/PersistentTopicsBase.java| 35 +- .../broker/admin/CreateSubscriptionTest.java | 26 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index cf8355f..f85c617 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -38,9 +38,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; @@ -829,15 +831,34 @@ public class PersistentTopicsBase extends AdminResource { try { if (partitionMetadata.partitions > 0) { // Create the subscription on each partition -List> futures = Lists.newArrayList(); PulsarAdmin admin = pulsar().getAdminClient(); +CountDownLatch latch = new CountDownLatch(partitionMetadata.partitions); +AtomicReference exception = new AtomicReference<>(); +AtomicInteger failureCount = new AtomicInteger(0); + for (int i = 0; i < partitionMetadata.partitions; i++) { - futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(), -subscriptionName, messageId)); +admin.persistentTopics() + .createSubscriptionAsync(topicName.getPartition(i).toString(), subscriptionName, messageId) +.handle((result, ex) -> { +if (ex != null) { +int c = failureCount.incrementAndGet(); +// fail the operation on unknown exception or if all the partitioned failed due to +// subscription-already-exist +if (c == partitionMetadata.partitions +|| !(ex instanceof PulsarAdminException.ConflictException)) { +exception.set(ex); +} +} +latch.countDown(); +return null; +}); } -FutureUtil.waitForAll(futures).join(); +latch.await(); +if (exception.get() != null) { +throw exception.get(); +} } else { validateAdminOperationOnTopic(authoritative); @@ -850,10 +871,10 @@ public class PersistentTopicsBase extends AdminResource { PersistentSubscription subscription = (PersistentSubscription) topic .createSubscription(subscriptionName, InitialPosition.Latest).get(); subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); -log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), -topicName, subscriptionName, messageId); +log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, +subscriptionName, messageId); } -} catch (Exception e) { +} catch (Throwable e) { Throwable t = e.getCause(); log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, messageId, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-bro
[incubator-pulsar] branch master updated: Derive source/sink arg-class name from function-class for file-url (#2258)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new a59f988 Derive source/sink arg-class name from function-class for file-url (#2258) a59f988 is described below commit a59f9884865d7430b1bc762444207358b16618a5 Author: Rajan Dhabalia AuthorDate: Tue Jul 31 11:32:49 2018 -0700 Derive source/sink arg-class name from function-class for file-url (#2258) * Derive source/sink arg-class name from functio-class for file-url archive * fix set type-arg if src/sink arg-class is not set * add unit-test --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 99 -- .../org/apache/pulsar/functions/utils/Utils.java | 15 ++-- .../functions/worker/rest/api/FunctionsImpl.java | 42 ++--- 3 files changed, 113 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 16f1a76..5398bc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -21,6 +21,10 @@ package org.apache.pulsar.io; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import java.io.File; import java.lang.reflect.Method; @@ -62,7 +66,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; @@ -70,11 +73,9 @@ import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; -import org.apache.pulsar.functions.worker.rest.WorkerServer; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -124,7 +125,7 @@ public class PulsarSinkE2ETest { public Object[][] validRoleName() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - + @BeforeMethod void setup(Method method) throws Exception { @@ -147,7 +148,6 @@ public class PulsarSinkE2ETest { config.setBrokerServicePortTls(brokerServiceTlsPort); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); - Set providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); @@ -156,7 +156,6 @@ public class PulsarSinkE2ETest { config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); - functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); @@ -190,12 +189,12 @@ public class PulsarSinkE2ETest { workerConfig.getClientAuthenticationParameters()); } pulsarClient = clientBuilder.build(); - + TenantInfo propAdmin = new TenantInfo(); propAdmin.getAdminRoles().add("superUser"); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - + Thread.sleep(100); } @@ -237,7 +236,7 @@ public class PulsarSinkE2ETest { workerConfig.setUseTls(true); workerConfig.setTlsAllowInsecureConnection(true); workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); - + workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); @@ -285,7 +284,7 @@ public class PulsarSinkE2ETest { } }, 5, 150); // validate pulsar sink consume
[incubator-pulsar] branch master updated: Set empty response to complete grpc txn (#2225)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 55c8dc0 Set empty response to complete grpc txn (#2225) 55c8dc0 is described below commit 55c8dc0519608a755509cdc445496140b38d98b3 Author: Rajan Dhabalia AuthorDate: Wed Jul 25 13:07:52 2018 -0700 Set empty response to complete grpc txn (#2225) --- .../java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 0c10fdb..966740d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -370,9 +370,10 @@ public class JavaInstanceMain implements AutoCloseable { if (runtime != null) { try { runtime.resetMetrics().get(); + responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (InterruptedException | ExecutionException e) { -log.error("Exception in JavaInstance doing getAndResetMetrics", e); +log.error("Exception in JavaInstance doing resetMetrics", e); throw new RuntimeException(e); } }
[incubator-pulsar] branch master updated: Add authorization support on function apis (#2213)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ce6fe8b Add authorization support on function apis (#2213) ce6fe8b is described below commit ce6fe8b9b757c505afafd8f209f673ec86733aa7 Author: Rajan Dhabalia AuthorDate: Mon Jul 23 20:18:25 2018 -0700 Add authorization support on function apis (#2213) * Add authorization support on function apis * fix authorization enable check --- conf/functions_worker.yml | 8 +++ .../pulsar/broker/admin/impl/FunctionsBase.java| 11 ++-- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 41 ++- pulsar-functions/worker/pom.xml| 6 +++ .../pulsar/functions/worker/WorkerConfig.java | 22 +++- .../pulsar/functions/worker/WorkerService.java | 17 +- .../functions/worker/rest/FunctionApiResource.java | 12 + .../pulsar/functions/worker/rest/WorkerServer.java | 15 ++ .../functions/worker/rest/api/FunctionsImpl.java | 61 -- .../worker/rest/api/v2/FunctionApiV2Resource.java | 14 +++-- .../rest/api/v2/FunctionApiV2ResourceTest.java | 25 + 11 files changed, 203 insertions(+), 29 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 58bcf1d..4194337 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 3 metricsSamplingPeriodSec: 60 +# Enforce authentication +authenticationEnabled: false +# Enforce authorization on accessing functions api +authorizationEnabled: false +# Set of autentication provider name list, which is a list of class names +authenticationProviders: +# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api +superUserRoles: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 6338ce9..f97f180 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -82,8 +82,8 @@ public class FunctionsBase extends AdminResource implements Supplier clusters = Sets.newHashSet(Lists.newArrayList("use")); +admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + +String roleName = validRoleName ? "superUser" : "invalid"; +TenantInfo propAdmin = new TenantInfo(); +propAdmin.getAdminRoles().add(roleName); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); +admin.tenants().updateTenant(tenant, propAdmin); + +String jarFilePathUrl = Utils.FILE + ":" ++ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); +FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, +sinkTopic, subscriptionName); +try { +admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); +Assert.assertTrue(validRoleName); +} catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { +Assert.assertFalse(validRoleName); +} +} } \ No newline at end of file diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 9b10cbe..029f573 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -36,6 +36,12 @@ ${project.groupId} + pulsar-broker-common + ${project.version} + + + + ${project.groupId} pulsar-functions-runtime ${project.version} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 09f54ef..eda9b15 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -20,14 +20,18 @@ package org.apache.pulsar.functions.worker; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Properties; +import java.util.
[incubator-pulsar] branch master updated: support subscription-type to be passed in sink-function (#2200)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7ba5c2d support subscription-type to be passed in sink-function (#2200) 7ba5c2d is described below commit 7ba5c2d4f6048762413f6cc1faab3dbf3210a08f Author: Rajan Dhabalia AuthorDate: Thu Jul 19 23:48:26 2018 -0700 support subscription-type to be passed in sink-function (#2200) * support subscription-type to be passed in sink-function * Fix: test * retain-ordering --- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 9 + .../java/org/apache/pulsar/functions/utils/FunctionConfig.java | 2 +- .../main/java/org/apache/pulsar/functions/utils/SinkConfig.java | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 83a5b57..2043877 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -55,6 +55,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.Function.SubscriptionType; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.SinkConfig; import org.apache.pulsar.functions.utils.Utils; @@ -212,6 +213,8 @@ public class CmdSinks extends CmdBase { protected String customSerdeInputString; @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink") protected FunctionConfig.ProcessingGuarantees processingGuarantees; +@Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order") +protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected Integer parallelism; @Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) @@ -261,6 +264,8 @@ public class CmdSinks extends CmdBase { if (null != processingGuarantees) { sinkConfig.setProcessingGuarantees(processingGuarantees); } + +sinkConfig.setRetainOrdering(retainOrdering); Map topicsToSerDeClassName = new HashMap<>(); if (null != inputs) { @@ -477,6 +482,10 @@ public class CmdSinks extends CmdBase { if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); } + +sourceSpecBuilder.setSubscriptionType( +sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER : SubscriptionType.SHARED); + functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 3140f95..42c829c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -55,7 +55,7 @@ public class FunctionConfig { ATMOST_ONCE, EFFECTIVELY_ONCE } - + public enum Runtime { JAVA, PYTHON diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index 3414973..7e97135 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -32,7 +32,6 @@ import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations. import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
[incubator-pulsar] branch master updated: Add support to configure subscription name for sink-function (#2198)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e28ef1b Add support to configure subscription name for sink-function (#2198) e28ef1b is described below commit e28ef1b4f7b6cb44cb424b7f121085fd5c1a87d9 Author: Rajan Dhabalia AuthorDate: Wed Jul 18 17:41:30 2018 -0700 Add support to configure subscription name for sink-function (#2198) --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 15 ++-- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 27 +++--- .../functions/instance/JavaInstanceRunnable.java | 4 +++- .../proto/src/main/proto/Function.proto| 1 + .../apache/pulsar/functions/utils/SinkConfig.java | 1 + 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 3ece269..78fd2e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -249,6 +249,7 @@ public class PulsarSinkE2ETest { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; +final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -260,7 +261,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, -sinkTopic); +sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -283,8 +284,7 @@ public class PulsarSinkE2ETest { } retryStrategically((test) -> { try { -SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() -.next(); +SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -314,6 +314,7 @@ public class PulsarSinkE2ETest { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; +final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -324,7 +325,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, -sinkTopic); +sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -347,8 +348,7 @@ public class PulsarSinkE2ETest { } retryStrategically((test) -> { try { -SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() -.next(); +SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -372,7 +372,7 @@ public class PulsarSinkE2ETest { Assert.assertEquals((int) success, totalMsgs); } -protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic) { +protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { File file = new File
[incubator-pulsar] branch master updated: Add kinesis-sink user metrics to sinkContext (#2169)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ba1ea66 Add kinesis-sink user metrics to sinkContext (#2169) ba1ea66 is described below commit ba1ea665f0a073701df68eeff3a6b2541f6d1fa6 Author: Rajan Dhabalia AuthorDate: Mon Jul 16 14:00:46 2018 -0700 Add kinesis-sink user metrics to sinkContext (#2169) --- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 34 ++ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index c3b6cdc..c3c2c45 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -88,21 +88,32 @@ public class KinesisSink implements Sink { private String streamName; private static final String defaultPartitionedKey = "default"; private static final int maxPartitionedKeyLength = 256; +private SinkContext sinkContext; public static final String ACCESS_KEY_NAME = "accessKey"; public static final String SECRET_KEY_NAME = "secretKey"; +public static final String METRICS_TOTAL_INCOMING = "_kinesis_total_incoming_"; +public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_"; +public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_"; +public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_"; + + @Override public void write(Record record) throws Exception { String partitionedKey = record.getKey().orElse(defaultPartitionedKey); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256 +ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record); ListenableFuture addRecordResult = kinesisProducer.addUserRecord(this.streamName, -partitionedKey, -createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record)); +partitionedKey, data); addCallback(addRecordResult, -ProducerSendCallback.create(this.streamName, record, System.nanoTime()), directExecutor()); +ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext), directExecutor()); +if (sinkContext != null) { +sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1); +sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length); +} if (LOG.isDebugEnabled()) { LOG.debug("Published message to kinesis stream {} with size {}", streamName, record.getValue().length); } @@ -120,6 +131,7 @@ public class KinesisSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { kinesisSinkConfig = KinesisSinkConfig.load(config); +this.sinkContext = sinkContext; checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name"); checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty aws-end-point"); @@ -158,16 +170,19 @@ public class KinesisSink implements Sink { private String streamName; private long startTime = 0; private final Handle recyclerHandle; +private SinkContext sinkContext; private ProducerSendCallback(Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } -static ProducerSendCallback create(String streamName, Record resultContext, long startTime) { +static ProducerSendCallback create(String streamName, Record resultContext, long startTime, +SinkContext sinkContext) { ProducerSendCallback sendCallback = RECYCLER.get(); sendCallback.resultContext = resultContext; sendCallback.streamName = streamName; sendCallback.startTime = startTime; +sendCallback.sinkContext = sinkContext; return sendCallback; } @@ -175,6 +190,7 @@ public class KinesisSink implements Sink { resultContext = null; streamName = null; startTime = 0; +sinkContext = null; recyclerHandle.recycle(this); } @@ -188,10 +204,13 @@ public class
[incubator-pulsar] branch master updated: Add function metrics with function-stats to get metrics on-demand (#2130)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5f779b4 Add function metrics with function-stats to get metrics on-demand (#2130) 5f779b4 is described below commit 5f779b4ce541e1201354ca5454bebf1276e93f0b Author: Rajan Dhabalia AuthorDate: Mon Jul 16 14:00:07 2018 -0700 Add function metrics with function-stats to get metrics on-demand (#2130) --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 84 -- .../functions/instance/JavaInstanceRunnable.java | 22 -- .../src/main/python/InstanceCommunication_pb2.py | 44 +++- .../instance/src/main/python/python_instance.py| 1 + .../src/main/proto/InstanceCommunication.proto | 1 + .../pulsar/functions/runtime/ThreadRuntime.java| 13 +++- 6 files changed, 133 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5d70525..3ece269 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -48,22 +48,26 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.WorkerServer; @@ -244,6 +248,7 @@ public class PulsarSinkE2ETest { final String sinkTopic = "persistent://" + replNamespace + "/output"; final String propertyKey = "key"; final String propertyValue = "value"; +final String functionName = "PulsarSink-test"; admin.namespaces().createNamespace(replNamespace); Set clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -254,7 +259,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); -FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test", +FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, sinkTopic); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); @@ -298,7 +303,76 @@ public class PulsarSinkE2ETest { } -protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) { + +@Test(timeOut = 2) +public void testPulsarSinkStats() throws Exception { + +final String namespacePortion = "io"; +final String replNamespace = tenant + "/" + namespacePortion; +final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; +final String sinkTopic = "persistent://" + replNamespace + "/output"; +final String propertyKey = "key"; +f
[incubator-pulsar] branch master updated: Add pulsar-version comment for future reference (#2163)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new b2ef2e4 Add pulsar-version comment for future reference (#2163) b2ef2e4 is described below commit b2ef2e45c44b07f3586ac46ed1bf40c785d4b178 Author: Rajan Dhabalia AuthorDate: Sat Jul 14 17:57:15 2018 -0700 Add pulsar-version comment for future reference (#2163) --- pulsar-common/src/main/proto/PulsarApi.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 7d61e5a..06eca29 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -160,7 +160,7 @@ enum ProtocolVersion { v12 = 12;// Added get topic's last messageId from broker // Added CommandActiveConsumerChange // Added CommandGetTopicsOfNamespace - v13 = 13; + v13 = 13; // Schema-registry : added avro schema format for json } message CommandConnect {
[incubator-pulsar] branch master updated: fix: conflicting jackson transitive dependency for worker (#2159)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e54c58a fix: conflicting jackson transitive dependency for worker (#2159) e54c58a is described below commit e54c58a3b143fc2a0b428230c8593662dcf92891 Author: Rajan Dhabalia AuthorDate: Fri Jul 13 17:50:51 2018 -0700 fix: conflicting jackson transitive dependency for worker (#2159) --- pulsar-functions/worker/pom.xml | 15 --- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index be8e704..9b10cbe 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -101,12 +101,21 @@ commons-io commons-io - - + + io.swagger swagger-core + + + com.fasterxml.jackson.core + * + + + com.fasterxml.jackson.dataformat + * + + -
[incubator-pulsar] branch master updated: Fix: file-url sink/source submission with cli (#2150)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3b8faea Fix: file-url sink/source submission with cli (#2150) 3b8faea is described below commit 3b8faea91e3dcd264545b0a36c58ea28d4a8bbba Author: Rajan Dhabalia AuthorDate: Fri Jul 13 15:24:37 2018 -0700 Fix: file-url sink/source submission with cli (#2150) --- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 23 +++-- .../org/apache/pulsar/admin/cli/CmdSources.java| 21 ++- .../src/test/resources/pulsar-io-cassandra.nar | Bin 0 -> 9588943 bytes .../src/test/resources/pulsar-io-twitter.nar | Bin 0 -> 6537014 bytes .../apache/pulsar/functions/utils/SinkConfig.java | 1 + .../pulsar/functions/utils/SourceConfig.java | 1 + 6 files changed, 35 insertions(+), 11 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index c27b40a..31f3e0a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -224,6 +224,8 @@ public class CmdSinks extends CmdBase { protected Integer parallelism; @Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String archive; +@Parameter(names = "--className", description = "The sink's class name if archive is file-url-path (file://)") +protected String className; @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the " + "sink's configuration") @@ -257,6 +259,10 @@ public class CmdSinks extends CmdBase { sinkConfig.setNamespace(namespace); } +if (null != className) { +sinkConfig.setClassName(className); +} + if (null != name) { sinkConfig.setName(name); } @@ -427,12 +433,17 @@ public class CmdSinks extends CmdBase { boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); if (!isBuiltin) { -sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); - -try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), -Collections.emptySet())) { -typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null -: Utils.getSinkType(sinkClassName, ncl).getName(); +if (sinkConfig.getArchive().startsWith(Utils.FILE)) { +if (StringUtils.isBlank(sinkConfig.getClassName())) { +throw new ParameterException("Class-name must be present for archive with file-url"); +} +sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class +} else { +sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); +try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), +Collections.emptySet())) { +typeArg = Utils.getSinkType(sinkClassName, ncl).getName(); +} } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 7c147e5..d050ab5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -216,6 +216,8 @@ public class CmdSources extends CmdBase { @Parameter(names = { "-a", "--archive" }, description = "The path to the NAR archive for the Source. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String archive; +@Parameter(names = "--className", description = "The source's class name if archive is file-url-path (file://)") +protected String classNa
[incubator-pulsar] branch master updated: Function: REST and CLI to get list of workers in cluster (#2112)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e9d67bc Function: REST and CLI to get list of workers in cluster (#2112) e9d67bc is described below commit e9d67bce84ec880fbea830ec5d8172ba7aedc1c9 Author: Rajan Dhabalia AuthorDate: Thu Jul 12 18:24:13 2018 -0700 Function: REST and CLI to get list of workers in cluster (#2112) --- .../pulsar/broker/admin/impl/FunctionsBase.java| 22 ++- .../apache/pulsar/io/PulsarFunctionAdminTest.java | 220 + .../org/apache/pulsar/client/admin/Functions.java | 8 + .../client/admin/internal/FunctionsImpl.java | 10 + .../org/apache/pulsar/admin/cli/CmdFunctions.java | 13 ++ .../apache/pulsar/functions/worker/WorkerInfo.java | 52 + pulsar-functions/worker/pom.xml| 6 + .../functions/worker/FunctionRuntimeManager.java | 6 +- .../pulsar/functions/worker/MembershipManager.java | 30 --- .../pulsar/functions/worker/WorkerConfig.java | 2 +- .../functions/worker/rest/api/FunctionsImpl.java | 13 +- .../worker/rest/api/v2/FunctionApiV2Resource.java | 23 ++- .../functions/worker/MembershipManagerTest.java| 18 +- .../functions/worker/SchedulerManagerTest.java | 73 +++ 14 files changed, 408 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 95f7f0e..6338ce9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.function.Supplier; + import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -29,25 +30,27 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import org.apache.pulsar.functions.worker.MembershipManager; +import org.apache.pulsar.functions.worker.WorkerInfo; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + public class FunctionsBase extends AdminResource implements Supplier { private final FunctionsImpl functions; @@ -199,7 +202,7 @@ public class FunctionsBase extends AdminResource implements Supplier getWorkers() { +return functions.getWorkers(); +} + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java new file mode 100644 index 000..537c1c8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.mockito.Mockito.spy; + +import java
[incubator-pulsar] branch master updated: add flatbuffer option to serialize kinesis-message in KinesisSink (#2108)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2a6d195 add flatbuffer option to serialize kinesis-message in KinesisSink (#2108) 2a6d195 is described below commit 2a6d1950954e9d0b1ec33c7fc738ebbdab0a4b9e Author: Rajan Dhabalia AuthorDate: Thu Jul 12 16:59:00 2018 -0700 add flatbuffer option to serialize kinesis-message in KinesisSink (#2108) * add flatbuffer option to serialize kinesis-message * add flatbuffer license * fix license --- distribution/server/src/assemble/LICENSE.bin.txt | 2 + pom.xml| 12 ++ .../pulsar/common/api/EncryptionContext.java | 1 - pulsar-io/kinesis/pom.xml | 6 + .../kinesis/src/main/fb/KinesisMessageApi.fbs | 59 +++ .../org/apache/pulsar/io/kinesis/KinesisSink.java | 10 +- .../pulsar/io/kinesis/KinesisSinkConfig.java | 6 +- .../java/org/apache/pulsar/io/kinesis/Utils.java | 129 ++-- .../pulsar/io/kinesis/fbs/CompressionType.java | 15 ++ .../pulsar/io/kinesis/fbs/EncryptionCtx.java | 68 + .../pulsar/io/kinesis/fbs/EncryptionKey.java | 52 +++ .../org/apache/pulsar/io/kinesis/fbs/KeyValue.java | 41 + .../org/apache/pulsar/io/kinesis/fbs/Message.java | 53 +++ .../org/apache/pulsar/io/kinesis/UtilsTest.java| 170 - 14 files changed, 572 insertions(+), 52 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 763f62c..33b9748 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -459,6 +459,8 @@ The Apache Software License, Version 2.0 - org.inferred-freebuilder-1.14.9.jar * Snappy Java - org.xerial.snappy-snappy-java-1.1.1.3.jar + * Flatbuffers Java +- com.google.flatbuffers-flatbuffers-java-1.9.0.jar BSD 3-clause "New" or "Revised" License diff --git a/pom.xml b/pom.xml index ba705a7..8a0da98 100644 --- a/pom.xml +++ b/pom.xml @@ -946,6 +946,11 @@ flexible messaging model and an intuitive client API. src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java **/ByteBufCodedInputStream.java **/ByteBufCodedOutputStream.java bin/proto/* @@ -1050,6 +1055,13 @@ flexible messaging model and an intuitive client API. src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java bin/proto/MLDataFormats_pb2.py + + + src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java + src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java index 98eaad7..ff359c5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java @@ -34,7 +34,6 @@ public class EncryptionContext { private Map keys; private byte[] param; -private Map metadata; private String algorithm; private CompressionType compressionType; private int uncompressedMessageSize; diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 08c3004..f0c2776 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -81,6 +81,12 @@ 0.12.8 + + + com.google.flatbuffers + flatbuffers-java + 1.9.0 + diff --git a/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs b/p
[incubator-pulsar] branch master updated: add list-connector rest api at workers (#2146)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d43bf87 add list-connector rest api at workers (#2146) d43bf87 is described below commit d43bf87f9f3a31cb7f4c06ec08b643da53ac20f7 Author: Rajan Dhabalia AuthorDate: Thu Jul 12 16:00:52 2018 -0700 add list-connector rest api at workers (#2146) --- .../pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index c382bd6..9c71f05 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -22,6 +22,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; +import org.apache.pulsar.common.io.ConnectorDefinition; +import java.util.List; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -158,4 +160,9 @@ public class FunctionApiV2Resource extends FunctionApiResource { return functions.downloadFunction(path); } +@GET +@Path("/connectors") +public List getConnectorsList() throws IOException { +return functions.getListOfConnectors(); +} }
[incubator-pulsar] branch master updated: Schedule task to update function stats separately (#2128)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 14fc5d9 Schedule task to update function stats separately (#2128) 14fc5d9 is described below commit 14fc5d956c0044743984e839cce0acdd9d7f5336 Author: Rajan Dhabalia AuthorDate: Thu Jul 12 14:54:49 2018 -0700 Schedule task to update function stats separately (#2128) keep previous stats sample configurable update metrics task fix test --- conf/functions_worker.yml | 1 + .../pulsar/functions/instance/ContextImpl.java | 19 +++-- .../pulsar/functions/instance/FunctionStats.java | 25 +++- .../pulsar/functions/instance/JavaInstance.java| 8 .../functions/instance/JavaInstanceRunnable.java | 45 +++-- .../src/main/python/InstanceCommunication_pb2.py | 24 +-- .../main/python/InstanceCommunication_pb2_grpc.py | 34 .../instance/src/main/python/contextimpl.py| 23 --- .../instance/src/main/python/python_instance.py| 46 +- .../instance/src/main/python/server.py | 9 + .../src/main/proto/InstanceCommunication.proto | 2 + .../pulsar/functions/runtime/JavaInstanceMain.java | 30 ++ .../pulsar/functions/runtime/ProcessRuntime.java | 44 + .../apache/pulsar/functions/runtime/Runtime.java | 4 ++ .../pulsar/functions/runtime/ThreadRuntime.java| 12 ++ .../functions/worker/FunctionRuntimeManager.java | 18 + .../functions/worker/FunctionsStatsGenerator.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 3 +- .../pulsar/functions/worker/WorkerService.java | 18 + .../worker/FunctionStatsGeneratorTest.java | 3 +- 20 files changed, 336 insertions(+), 36 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 73edbfb..58bcf1d 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -44,3 +44,4 @@ rescheduleTimeoutMs: 6 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 3 +metricsSamplingPeriodSec: 60 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 0a1b965..b41d6f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -91,6 +91,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { } } +private ConcurrentMap currentAccumulatedMetrics; private ConcurrentMap accumulatedMetrics; private Map publishProducers; @@ -110,6 +111,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.logger = logger; this.pulsarClient = client; this.classLoader = classLoader; +this.currentAccumulatedMetrics = new ConcurrentHashMap<>(); this.accumulatedMetrics = new ConcurrentHashMap<>(); this.publishProducers = new HashMap<>(); this.publishSerializers = new HashMap<>(); @@ -324,11 +326,23 @@ class ContextImpl implements Context, SinkContext, SourceContext { @Override public void recordMetric(String metricName, double value) { -accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); -accumulatedMetrics.get(metricName).update(value); +currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); +currentAccumulatedMetrics.get(metricName).update(value); } public MetricsData getAndResetMetrics() { +MetricsData retval = getMetrics(); +resetMetrics(); +return retval; +} + +public void resetMetrics() { +this.accumulatedMetrics.clear(); +this.accumulatedMetrics.putAll(currentAccumulatedMetrics); +this.currentAccumulatedMetrics.clear(); +} + +public MetricsData getMetrics() { MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); for (String metricName : accumulatedMetrics.keySet()) { MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder(); @@ -339,7 +353,6 @@ class ContextImpl implements Context, SinkContext, SourceContext { metricsDataBuilder.putMetrics(metricName, bldr.build()); } MetricsData retval = metricsDataBuilder.build(); -accumulatedMetrics.clear(); return retval; } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org
[incubator-pulsar] branch master updated: Temporarely disable SequenceIdWithErrorTest (#2140)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8631067 Temporarely disable SequenceIdWithErrorTest (#2140) 8631067 is described below commit 86310674a6d2b8c4e5655c5658117d1b94b036dc Author: Matteo Merli AuthorDate: Wed Jul 11 23:10:36 2018 -0700 Temporarely disable SequenceIdWithErrorTest (#2140) --- .../java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index 895d788..60e7cd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -39,7 +39,7 @@ public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests { /** * Test that sequence id from a producer is correct when there are send errors */ -@Test +@Test(enabled = false) public void testCheckSequenceId() throws Exception { admin.namespaces().createNamespace("prop/my-test", Collections.singleton("usc"));
[incubator-pulsar] branch master updated: derive worker-host and id at runtime if not provided (#2113)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7047146 derive worker-host and id at runtime if not provided (#2113) 7047146 is described below commit 704714673a8dcfdf8a2b2106f1b3c9e45341cbe7 Author: Rajan Dhabalia AuthorDate: Mon Jul 9 13:39:46 2018 -0700 derive worker-host and id at runtime if not provided (#2113) --- .../pulsar/functions/worker/WorkerConfig.java | 28 +- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index af1f162..c7ebcf9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -24,6 +24,10 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.commons.lang3.StringUtils; import lombok.Data; import lombok.EqualsAndHashCode; @@ -69,7 +73,7 @@ public class WorkerConfig implements Serializable { private String tlsTrustCertsFilePath = ""; private boolean tlsAllowInsecureConnection = false; private boolean tlsHostnameVerificationEnable = false; - + @Data @Setter @Getter @@ -108,4 +112,26 @@ public class WorkerConfig implements Serializable { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), WorkerConfig.class); } + +public String getWorkerId() { +if (StringUtils.isBlank(this.workerId)) { +this.workerId = getWorkerHostname(); +} +return this.workerId; +} + +public String getWorkerHostname() { +if (StringUtils.isBlank(this.workerHostname)) { +this.workerHostname = unsafeLocalhostResolve(); +} +return this.workerHostname; +} + +public static String unsafeLocalhostResolve() { +try { +return InetAddress.getLocalHost().getHostName(); +} catch (UnknownHostException ex) { +throw new IllegalStateException("Failed to resolve localhost name.", ex); +} +} }
[incubator-pulsar] branch master updated: Kinesis sink publish full json message (#2079)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new da8981c Kinesis sink publish full json message (#2079) da8981c is described below commit da8981c7becab94443cdac41a9495654943570ef Author: Rajan Dhabalia AuthorDate: Fri Jul 6 11:59:29 2018 -0700 Kinesis sink publish full json message (#2079) * Kinesis sink publish full json message * fix pulsar typo --- pulsar-io/core/pom.xml | 13 ++ .../org/apache/pulsar/io/kinesis/KinesisSink.java | 23 ++- .../pulsar/io/kinesis/KinesisSinkConfig.java | 26 +++- .../java/org/apache/pulsar/io/kinesis/Utils.java | 103 + .../org/apache/pulsar/io/kinesis/UtilsTest.java| 162 + 5 files changed, 324 insertions(+), 3 deletions(-) diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml index 30da3d5..003e6d8 100644 --- a/pulsar-io/core/pom.xml +++ b/pulsar-io/core/pom.xml @@ -47,6 +47,19 @@ + + + ${project.groupId} + protobuf-shaded + ${project.version} + + + com.google.protobuf + protobuf-java + + + + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 5a43312..eaa2b91 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,9 +67,17 @@ import io.netty.util.Recycler.Handle; * which accepts json-map of credentials in awsCredentialPluginParam * eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"} * 5. awsCredentialPluginParam: json-parameters to initialize {@link AwsCredentialProviderPlugin} - * + * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"] + * a. ONLY_RAW_PAYLOAD: publishes raw payload to stream + * b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties + payload) in json format + * json-schema: + * {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]}," [...] + * Example: + * {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}} * * + * + * */ public class KinesisSink implements Sink { @@ -92,7 +101,8 @@ public class KinesisSink implements Sink { ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256 ListenableFuture addRecordResult = kinesisProducer.addUserRecord(this.streamName, -partitionedKey, ByteBuffer.wrap(value)); +partitionedKey, + ByteBuffer.wrap(createKinesi
[incubator-pulsar] branch master updated: avoid unnecessary reflection to create pulsar source instance (#2083)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f71f4ba avoid unnecessary reflection to create pulsar source instance (#2083) f71f4ba is described below commit f71f4ba8c34d0b196c544bf04fc8d2f5a393087c Author: Rajan Dhabalia AuthorDate: Thu Jul 5 16:22:49 2018 -0700 avoid unnecessary reflection to create pulsar source instance (#2083) --- .../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 9 + 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index bd1433c..5485d72 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -467,14 +467,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { if (sourceSpec.getTimeoutMs() > 0 ) { pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); } - -Object[] params = {this.client, pulsarSourceConfig}; -Class[] paramTypes = {PulsarClient.class, PulsarSourceConfig.class}; - -object = Reflections.createInstance( -PulsarSource.class.getName(), -PulsarSource.class.getClassLoader(), params, paramTypes); - +object = new PulsarSource(this.client, pulsarSourceConfig); } else { object = Reflections.createInstance( sourceSpec.getClassName(),
[incubator-pulsar] branch master updated: Cleanup unused function resource (#2082)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d62cdd8 Cleanup unused function resource (#2082) d62cdd8 is described below commit d62cdd884b324aa1fb210687a674d62877a6b809 Author: Rajan Dhabalia AuthorDate: Thu Jul 5 14:55:23 2018 -0700 Cleanup unused function resource (#2082) --- .../functions/worker/rest/api/FunctionsImpl.java | 423 - 1 file changed, 158 insertions(+), 265 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 9da5e6d..14d7a81 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -19,9 +19,11 @@ package org.apache.pulsar.functions.worker.rest.api; import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.gson.Gson; - +import static org.apache.pulsar.functions.utils.Reflections.createInstance; +import static org.apache.pulsar.functions.utils.Utils.FILE; +import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; +import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; import java.io.File; import java.io.FileInputStream; @@ -32,25 +34,21 @@ import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; -import java.util.*; +import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; + import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; -import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -66,22 +64,21 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import static org.apache.pulsar.functions.utils.Reflections.createInstance; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; -import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.Utils; -import static org.apache.pulsar.functions.utils.Utils.HTTP; -import static org.apache.pulsar.functions.utils.Utils.FILE; -import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import org.glassfish.jersey.media.multipart.FormDataParam; + +import com.google.gson.Gson; + +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; @Slf4j public class FunctionsImpl { @@ -112,16 +109,9 @@ public class FunctionsImpl { return true; } -@POST -@Path("/{tenant}/{namespace}/{functionName}") -@Consumes(MediaType.MULTIPART_FORM_DATA) -public Response registerFunction(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("functionName") String functionName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, -
[incubator-pulsar] branch master updated: fix: configure crypto-action for internal consumer independently cryptoReader (#2073)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e422a6f fix: configure crypto-action for internal consumer independently cryptoReader (#2073) e422a6f is described below commit e422a6f93d2e1eb9a3469401966e5c7c4a4a9bda Author: Rajan Dhabalia AuthorDate: Tue Jul 3 12:58:00 2018 -0700 fix: configure crypto-action for internal consumer independently cryptoReader (#2073) * fix: configure crypto-action for internal consumer independently cryptoReader * add test-case for multi-topic + cryptoFailureAction --- .../pulsar/client/api/SimpleProducerConsumerTest.java | 13 +++-- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 50d9687..e3ea2a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi; @@ -2430,18 +2431,18 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } } -Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") - .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) -.subscribe(); - Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) .cryptoKeyReader(new EncKeyReader()).create(); + +Consumer consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) +.subscribe(); String message = "my-message"; producer.send(message.getBytes()); -MessageImpl msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS); +TopicMessageImpl msg = (TopicMessageImpl) consumer.receive(5, TimeUnit.SECONDS); String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader()); assertEquals(message, receivedMessage); @@ -2450,7 +2451,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } -private String decryptMessage(MessageImpl msg, String encryptionKeyName, CryptoKeyReader reader) +private String decryptMessage(TopicMessageImpl msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception { Optional ctx = msg.getEncryptionCtx(); Assert.assertTrue(ctx.isPresent()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4be8f58..5221b70 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -484,14 +484,14 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); internalConsumerConfig.setProperties(conf.getProperties()); internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); - + internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); + if (null != conf.getConsumerEventListener()) { internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); } if (conf.getCryptoKeyReader() != null) { internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); - internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); } if (conf.getAckTimeoutMillis() != 0) { internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
[incubator-pulsar] branch master updated: By default PulsarSource consume encrypted message (#2074)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8e95ddc By default PulsarSource consume encrypted message (#2074) 8e95ddc is described below commit 8e95ddc5f3103279a39bd98031b3d16fb17dbdb3 Author: Rajan Dhabalia AuthorDate: Tue Jul 3 11:00:40 2018 -0700 By default PulsarSource consume encrypted message (#2074) * By default PulsarSource consume encrypted message * fix: mock test --- .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 2 ++ .../test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java | 1 + 2 files changed, 3 insertions(+) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e5100c8..54373ba 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -67,6 +67,8 @@ public class PulsarSource implements Source { // Setup pulsar consumer ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer() +//consume message even if can't decrypt and deliver it along with encryption-ctx +.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 3c5e61b..a7e3610 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -75,6 +75,7 @@ public class PulsarSourceTest { PulsarClient pulsarClient = mock(PulsarClient.class); ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); doReturn(consumerBuilder).when(consumerBuilder).topics(anyList()); + doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any()); doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
[incubator-pulsar] branch master updated: Fix: destination lookup url for v2 (#2075)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5d73bb4 Fix: destination lookup url for v2 (#2075) 5d73bb4 is described below commit 5d73bb40f1ade12ba27d49d26cb50477f4e39c30 Author: Rajan Dhabalia AuthorDate: Tue Jul 3 11:00:07 2018 -0700 Fix: destination lookup url for v2 (#2075) --- .../src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index a5b0455..185412c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -100,7 +100,7 @@ class HttpLookupService implements LookupService { public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace) { CompletableFuture> future = new CompletableFuture<>(); -String format = namespace.isV2() ? "admin/v2/namespaces/%s/destinations" : "admin/namespaces/%s/destinations"; +String format = namespace.isV2() ? "admin/v2/namespaces/%s/topics" : "admin/namespaces/%s/destinations"; httpClient .get(String.format(format, namespace), String[].class) .thenAccept(topics -> {
[incubator-pulsar] branch master updated: Pass encryption-context to pulsar-sink if source receive encrypted message (#2068)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5e4579d Pass encryption-context to pulsar-sink if source receive encrypted message (#2068) 5e4579d is described below commit 5e4579d2fbfa70ac0668a298c269e31d3b54ab13 Author: Rajan Dhabalia AuthorDate: Mon Jul 2 17:51:33 2018 -0700 Pass encryption-context to pulsar-sink if source receive encrypted message (#2068) * Pass encryption-context to pulsar-sink if source receive encrypted message * move Encryption-ctx to api-pkg --- .../client/api/SimpleProducerConsumerTest.java | 4 ++-- .../client/api/ConsumerCryptoFailureAction.java | 2 +- .../java/org/apache/pulsar/client/api/Message.java | 12 .../org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++-- .../org/apache/pulsar/client/impl/MessageImpl.java | 2 ++ .../apache/pulsar/client/impl/MessageRecordImpl.java | 15 +++ .../apache/pulsar/client/impl/TopicMessageImpl.java | 9 - .../apache/pulsar/common/api}/EncryptionContext.java | 2 +- .../apache/pulsar/functions/source/PulsarRecord.java | 3 +++ .../apache/pulsar/functions/source/PulsarSource.java | 2 ++ pulsar-io/core/pom.xml | 12 .../org/apache/pulsar/io/core/RecordContext.java | 20 12 files changed, 80 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 7fb027d..50d9687 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -55,8 +55,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.EncryptionContext; -import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java index e71b798..3ab48f7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java @@ -19,7 +19,7 @@ package org.apache.pulsar.client.api; -import org.apache.pulsar.client.impl.EncryptionContext; +import org.apache.pulsar.common.api.EncryptionContext; public enum ConsumerCryptoFailureAction { FAIL, // This is the default option to fail consume until crypto succeeds diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java index 33be458..d614962 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java @@ -20,6 +20,10 @@ package org.apache.pulsar.client.api; import java.util.Map; +import java.util.Optional; + +import org.apache.pulsar.common.api.EncryptionContext; + /** * The message abstraction used in Pulsar. @@ -127,4 +131,12 @@ public interface Message { * @return the key of the message */ String getKey(); + +/** + * {@link EncryptionContext} contains encryption and compression information in it using which application can + * decrypt consumed message with encrypted-payload. + * + * @return + */ +Optional getEncryptionCtx(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e7924cb..6f78119 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -63,9 +63,10 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; -import
[incubator-pulsar] branch master updated: Forward encryption properties with encrypted payload to consumer (#2024)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9b1cc78 Forward encryption properties with encrypted payload to consumer (#2024) 9b1cc78 is described below commit 9b1cc78e3e3f9897f931d186afdf5e02dc399136 Author: Rajan Dhabalia AuthorDate: Sun Jul 1 15:40:02 2018 -0700 Forward encryption properties with encrypted payload to consumer (#2024) * Forward encryption properties with encrypted payload to consumer * add EncryptionCtx to message to store encryption metadata --- .../client/api/SimpleProducerConsumerTest.java | 161 - .../client/api/ConsumerCryptoFailureAction.java| 19 ++- .../apache/pulsar/client/impl/ConsumerImpl.java| 65 - .../EncryptionContext.java}| 41 -- .../apache/pulsar/client/impl/MessageCrypto.java | 2 +- .../org/apache/pulsar/client/impl/MessageImpl.java | 16 +- .../apache/pulsar/client/impl/MessageParser.java | 3 +- 7 files changed, 280 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index ab2af4c..7fb027d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -35,6 +35,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -54,10 +55,22 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.EncryptionContext; +import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey; +import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarDecoder; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -67,8 +80,12 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + public class SimpleProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); @@ -,10 +2239,13 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer2.send(message.getBytes()); } -Message msg = null; +MessageImpl msg = null; for (int i = 0; i < totalMsg * 2; i++) { -msg = consumer.receive(5, TimeUnit.SECONDS); +msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS); +// verify that encrypted message contains encryption-context +msg.getEncryptionCtx() +.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); String receivedMessage = new String(msg.getData()); log.debug("Received message: [{}]", receivedMessage); String expectedMessage = "my-message-" + i; @@ -2276,7 +2296,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { final int totalMsg = 10; -Message msg = null; +MessageImpl msg = null; Set messageSet = Sets.newHashSet(); Consumer consumer = pulsarClient.newConsumer() .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name") @@ -2307,7 +2
[incubator-pulsar] branch master updated: Pulsar Kinesis-sink upgrade aws-sdk version (#2058)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 89c7b65 Pulsar Kinesis-sink upgrade aws-sdk version (#2058) 89c7b65 is described below commit 89c7b65f182f35c16558aeb5278208cb3f88146f Author: Rajan Dhabalia AuthorDate: Sun Jul 1 10:45:38 2018 -0700 Pulsar Kinesis-sink upgrade aws-sdk version (#2058) * Pulsar Kinesis-sink upgrade aws-sdk version * fix: formatting --- pulsar-io/kinesis/pom.xml | 20 ++-- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 613c047..ab39043 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -64,27 +64,22 @@ - -com.amazonaws -aws-java-sdk-core -1.11.46 + + com.amazonaws + aws-java-sdk-core com.amazonaws amazon-kinesis-client - 1.7.2 + 1.9.0 + com.amazonaws amazon-kinesis-producer 0.12.8 - - com.google.protobuf - protobuf-java - 2.6.1 - @@ -106,10 +101,7 @@ false - com.amazonaws*:* - com.google.protobuf*:* - ${project.groupId}:* - com.fasterxml.jackson*:* + *:*
[incubator-pulsar] branch master updated: Forward user-properties to sink (#2057)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7cebe23 Forward user-properties to sink (#2057) 7cebe23 is described below commit 7cebe2330bd7376df066a31c01231f0cb693deaf Author: Rajan Dhabalia AuthorDate: Sun Jul 1 10:45:27 2018 -0700 Forward user-properties to sink (#2057) --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 38 +- .../functions/instance/JavaInstanceRunnable.java | 4 +-- .../apache/pulsar/functions/sink/PulsarSink.java | 11 --- .../pulsar/functions/source/PulsarRecord.java | 4 +++ .../pulsar/functions/source/PulsarSource.java | 1 + 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 7b08f6b..5d70525 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -32,10 +32,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.test.PortManager; @@ -49,9 +45,12 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -80,7 +79,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; -import io.netty.util.concurrent.DefaultThreadFactory; import jersey.repackaged.com.google.common.collect.Lists; /** @@ -240,22 +238,26 @@ public class PulsarSinkE2ETest { @Test(timeOut = 2) public void testE2EPulsarSink() throws Exception { -final String namespacePortion = "myReplNs"; +final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; +final String sinkTopic = "persistent://" + replNamespace + "/output"; +final String propertyKey = "key"; +final String propertyValue = "value"; admin.namespaces().createNamespace(replNamespace); Set clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); // create a producer that creates a topic at broker -ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(sourceTopic); -Producer producer = producerBuilder.create(); +Producer producer = pulsarClient.newProducer().topic(sourceTopic).create(); +Consumer consumer = pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe(); String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); -FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test"); +FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test", +sinkTopic); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); - + // try to update function to test: update-function functionality admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); @@ -271,8 +273,8 @@ public class PulsarSinkE2ETest { int totalMsgs = 5; for (int i = 0; i < totalMsgs; i++) { -String message = "my-message-" + i; -producer.send(message.getBytes()); +String data = "my-message-" + i; +producer.newMessage().property(propertyKey, propertyValue).value(data.getBy
[incubator-pulsar] branch master updated: add admin-api to update function with url (#1987)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 358b68d add admin-api to update function with url (#1987) 358b68d is described below commit 358b68dd560640ed025f91b13971367ff345a591 Author: Rajan Dhabalia AuthorDate: Thu Jun 21 23:42:33 2018 -0700 add admin-api to update function with url (#1987) * add admin-api to update function with url * add cli option for jar-url --- .../pulsar/broker/admin/impl/FunctionsBase.java| 3 +- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 7 ++-- .../org/apache/pulsar/client/admin/Functions.java | 22 .../client/admin/internal/FunctionsImpl.java | 17 ++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 6 +++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 6 +++- .../org/apache/pulsar/admin/cli/CmdSources.java| 12 +-- .../functions/worker/rest/api/FunctionsImpl.java | 21 .../worker/rest/api/v2/FunctionApiV2Resource.java | 3 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 39 ++ 10 files changed, 120 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index bacade6..6955d5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -95,10 +95,11 @@ public class FunctionsBase extends AdminResource implements Supplier { try { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index a86036c..135c337 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -117,6 +117,28 @@ public interface Functions { * Unexpected error */ void updateFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException; + +/** + * Update the configuration for a function. + * + * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * + * + * @param functionDetails + *the function configuration object + * @param pkgUrl + *url from which pkg can be downloaded + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ +void updateFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException; /** * Delete an existing function diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 68d3046..e7e39b1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -166,6 +166,23 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override +public void updateFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException { +try { +final FormDataMultiPart mp = new FormDataMultiPart(); + +mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + +mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails), +MediaType.APPLICATION_JSON_TYPE)); + request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()) +.path(functionDetails.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), +ErrorData.class); +} catch (Exception e) { +throw getApiException(e); +} +} + +@Override public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); diff --git a/pulsar-client-tools/src/main/ja
[incubator-pulsar] branch master updated: Add function-package-url support in function cli add url support to function cli (#1956)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 52c77d5 Add function-package-url support in function cli add url support to function cli (#1956) 52c77d5 is described below commit 52c77d57c528f2dc4b7022ed8e1082245e1a9729 Author: Rajan Dhabalia AuthorDate: Tue Jun 19 23:09:39 2018 -0700 Add function-package-url support in function cli add url support to function cli (#1956) * Add function-package-url support in function cli add url support to function cli * file-url comment * add server-side validation for source/sink class and arg-type remove cmd-line args --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 2 +- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 128 - .../org/apache/pulsar/admin/cli/CmdFunctions.java | 78 + .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 89 ++ .../org/apache/pulsar/admin/cli/CmdSources.java| 83 + .../apache/pulsar/functions/utils/SinkConfig.java | 3 - .../pulsar/functions/utils/SourceConfig.java | 1 - .../org/apache/pulsar/functions/utils/Utils.java | 10 ++ .../functions/utils/validation/ValidatorImpls.java | 21 +++- .../pulsar/functions/worker/FunctionActioner.java | 43 --- .../org/apache/pulsar/functions/worker/Utils.java | 8 +- .../functions/worker/rest/api/FunctionsImpl.java | 116 +-- .../functions/worker/FunctionActionerTest.java | 3 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 68 +++ 14 files changed, 545 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 531f8ad..4987a32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -64,7 +64,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.worker.Utils; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.WorkerServer; diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 5861e22..e44562a 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction; import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; +import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; +import org.apache.pulsar.admin.cli.CmdSources.CreateSource; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -39,8 +41,10 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.io.core.RecordContext; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -72,6 +76,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** * Unit test of {@link CmdFunctions}. @@ -91,6 +96,8 @@ public class CmdFunctionsTest { private PulsarAdmin admin; private Functions functions; private CmdFunctions cmd; +private CmdSinks cmdSinks; +private CmdSources cmdSources; public static class DummyFunction implements Function { @@ -103,7 +110,7 @@ public class CmdFunctionsTest { return null
[incubator-pulsar] branch master updated: Add documentation and flexible aws-credential plugin to support aws-role (#1972)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8ba8acd Add documentation and flexible aws-credential plugin to support aws-role (#1972) 8ba8acd is described below commit 8ba8acdbe812725df6739c5d19db3e47a553f351 Author: Rajan Dhabalia AuthorDate: Mon Jun 18 12:15:28 2018 -0700 Add documentation and flexible aws-credential plugin to support aws-role (#1972) * Add documentation and flexible aws-credential plugin to support aws-role * add protobuf relocation --- pulsar-io/kinesis/pom.xml | 11 +- .../io/kinesis/AwsCredentialProviderPlugin.java| 36 ++--- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 104 -- .../apache/pulsar/io/kinesis/KinesisSinkTest.java | 149 + 4 files changed, 227 insertions(+), 73 deletions(-) diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 7f9f488..0dee750 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -47,13 +47,16 @@ com.fasterxml.jackson.core jackson-databind - ${jackson.version} com.fasterxml.jackson.dataformat jackson-dataformat-yaml - ${jackson.version} + + + + com.google.code.gson + gson @@ -110,10 +113,6 @@ com.google.protobuf org.apache.pulsar.replicator.com.google.protobuf - - com.amazonaws - org.apache.pulsar.com.amazonaws - diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java index 8c616cc..7e463bb 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java @@ -19,42 +19,32 @@ package org.apache.pulsar.io.kinesis; +import java.io.Closeable; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicSessionCredentials; + /** * Kinesis source/sink calls credential-provider while refreshing aws accessKey and secreKey. So, implementation * AwsCredentialProviderPlugin needs to makes sure to return non-expired keys when it requires. * */ -public interface AwsCredentialProviderPlugin { - +public interface AwsCredentialProviderPlugin extends Closeable { + /** * accepts aws-account related param and initialize credential provider. - * + * * @param param */ void init(String param); - -/** - * Returns the AWS access key ID for this credentials object. - * - * @return The AWS access key ID for this credentials object. - */ -String getAWSAccessKeyId(); /** - * Returns the AWS secret access key for this credentials object. + * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials} in case credential belongs to IAM user or + * it can return {@link BasicSessionCredentials} if user wants to generate temporary credential for a given IAM + * role. * - * @return The AWS secret access key for this credentials object. - */ -String getAWSSecretKey(); - -/** - * Forces this credentials provider to refresh its credentials. For many - * implementations of credentials provider, this method may simply be a - * no-op, such as any credentials provider implementation that vends - * static/non-changing credentials. For other implementations that vend - * different credentials through out their lifetime, this method should - * force the credentials provider to refresh its credentials. + * @return */ -void refresh(); +AWSCredentialsProvider getCredentialProvider(); } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 39af1f3..5a43312 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -30,6 +30,9 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; @@ -37,20 +40,35 @@ import org.slf4j.LoggerFactory
[incubator-pulsar] branch master updated: Fix: managedledger factory shutdown stuck when any of ledger future-result is not completed (#1945)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8682776 Fix: managedledger factory shutdown stuck when any of ledger future-result is not completed (#1945) 8682776 is described below commit 8682776b2ae82353e05975c69a2fa7cd21f99a03 Author: Rajan Dhabalia AuthorDate: Fri Jun 8 17:35:48 2018 -0700 Fix: managedledger factory shutdown stuck when any of ledger future-result is not completed (#1945) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 1 + .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 36 +- 2 files changed, 9 insertions(+), 28 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index cb4d2f9..2d90725 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -272,6 +272,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { for (CompletableFuture ledgerFuture : ledgers.values()) { ManagedLedgerImpl ledger = ledgerFuture.getNow(null); if (ledger == null) { +latch.countDown(); continue; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 328d6b2..531f8ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -26,7 +26,6 @@ import java.io.File; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.MalformedURLException; -import java.net.URI; import java.net.URL; import java.util.HashMap; import java.util.HashSet; @@ -103,8 +102,6 @@ public class PulsarSinkE2ETest { final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; String primaryHost; -ExecutorService executor; -ExecutorService workerExecutor; private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); @@ -125,9 +122,6 @@ public class PulsarSinkE2ETest { log.info("--- Setting up method {} ---", method.getName()); -executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue()); -workerExecutor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-worker-test")); - // Start local bookkeeper ensemble bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); bkEnsemble.start(); @@ -158,10 +152,7 @@ public class PulsarSinkE2ETest { functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); -boolean isFunctionWebServerRequired = method.getName() -.equals("testExternalReplicatorRedirectionToWorkerService"); -Optional functionWorkerService = isFunctionWebServerRequired ? Optional.ofNullable(null) -: Optional.of(functionsWorkerService); +Optional functionWorkerService = Optional.of(functionsWorkerService); pulsar = new PulsarService(config, functionWorkerService); pulsar.start(); @@ -197,29 +188,17 @@ public class PulsarSinkE2ETest { TenantInfo propAdmin = new TenantInfo(); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - -if (isFunctionWebServerRequired) { -URI dlogURI = Utils.initializeDlogNamespace(config.getZookeeperServers(), "/ledgers"); -functionsWorkerService.start(dlogURI); -functionsWorkerServer = new WorkerServer(functionsWorkerService); -workerExecutor.submit(functionsWorkerServer); -} + Thread.sleep(100); } @AfterMethod void shutdown() throws Exception { log.info("--- Shutting down ---"); -if (executor != null) { -executor.shutdown(); -} -if (workerExecutor != null) { -workerExecutor.shutdown(); -} pulsarClient.close(); admin.close(); -pulsar.close(); functionsWorkerService.stop(); +pulsar.close(); bkEnsemble.stop(); } @@ -260,7 +239,7 @@ public class PulsarSinkE2ETest {
[incubator-pulsar] branch master updated: Use broker advertisedAddress + tls url in function worker (#1946)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2bf387d Use broker advertisedAddress + tls url in function worker (#1946) 2bf387d is described below commit 2bf387d00262db38754ca804956cd83fccd4c512 Author: Rajan Dhabalia AuthorDate: Fri Jun 8 17:36:07 2018 -0700 Use broker advertisedAddress + tls url in function worker (#1946) --- .../src/main/java/org/apache/pulsar/PulsarBrokerStarter.java | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index 2a0e1a2..24e3fda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -144,8 +144,15 @@ public class PulsarBrokerStarter { workerConfig = WorkerConfig.load(starterArguments.fnWorkerConfigFile); } // worker talks to local broker -workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + brokerConfig.getBrokerServicePort()); -workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:; + brokerConfig.getWebServicePort()); +boolean useTls = workerConfig.isUseTls(); +String pulsarServiceUrl = useTls && isNotBlank(PulsarService.brokerUrlTls(brokerConfig)) +? PulsarService.brokerUrlTls(brokerConfig) +: PulsarService.brokerUrl(brokerConfig); +String webServiceUrl = useTls && isNotBlank(PulsarService.webAddressTls(brokerConfig)) +? PulsarService.webAddressTls(brokerConfig) +: PulsarService.webAddress(brokerConfig); +workerConfig.setPulsarServiceUrl(pulsarServiceUrl); +workerConfig.setPulsarWebServiceUrl(webServiceUrl); String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( brokerConfig.getAdvertisedAddress()); workerConfig.setWorkerHostname(hostname); -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Add Client auth plugin and tls support for function to connect with broker (#1935)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6e336b4 Add Client auth plugin and tls support for function to connect with broker (#1935) 6e336b4 is described below commit 6e336b4080f02370c667e1a93d5d7c65b982d562 Author: Rajan Dhabalia AuthorDate: Fri Jun 8 13:00:12 2018 -0700 Add Client auth plugin and tls support for function to connect with broker (#1935) * Add Client auth plugin and tls support for function to connect with broker * add authConfig builder * add hostnameverification and tlsCertPath * add broker-tls url on worker * take string type for boolean data-type --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 89 ++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 36 +++-- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 29 ++- .../org/apache/pulsar/admin/cli/CmdSources.java| 29 ++- .../client/impl/conf/ClientConfigurationData.java | 2 +- .../functions/instance/AuthenticationConfig.java | 44 +++ .../src/main/python/python_instance_main.py| 21 - .../pulsar/functions/runtime/JavaInstanceMain.java | 46 --- .../pulsar/functions/runtime/ProcessRuntime.java | 31 +++- .../functions/runtime/ProcessRuntimeFactory.java | 8 +- .../functions/runtime/ThreadRuntimeFactory.java| 41 +++--- .../functions/runtime/ProcessRuntimeTest.java | 2 +- .../functions/worker/FunctionRuntimeManager.java | 12 ++- .../pulsar/functions/worker/MembershipManager.java | 4 +- .../org/apache/pulsar/functions/worker/Utils.java | 17 - .../org/apache/pulsar/functions/worker/Worker.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 8 +- .../pulsar/functions/worker/WorkerService.java | 15 +++- 18 files changed, 375 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5089e95..328d6b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.Mockito.spy; @@ -27,6 +28,8 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -40,14 +43,17 @@ import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -87,7 +93,7 @@ public class PulsarSinkE2ETest { ServiceConfiguration config; WorkerConfig workerConfig; -URL url; +URL urlTls; PulsarService pulsar; PulsarAdmin admin; PulsarClient pulsarClient; @@ -102,8 +108,16 @@ public class PulsarSinkE2ETest { private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); +private final int brokerWebServiceTlsPort = PortManager.nextFreePort(); private final int brokerServicePort = PortManager.nextFreePort(); +private final int brokerServiceTlsPort = PortManager.nextFreePort(); private final int workerServicePort = PortManager.nextFreePort(); + +private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem"; +private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; +private final String TLS_CLIENT_CERT_FILE
[incubator-pulsar] branch master updated: Introduce kinesis sink on function (#1904)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4b37fda Introduce kinesis sink on function (#1904) 4b37fda is described below commit 4b37fdaeb1f0a286e591e44120da46d33a1c0cc5 Author: Rajan Dhabalia AuthorDate: Thu Jun 7 13:35:00 2018 -0700 Introduce kinesis sink on function (#1904) * Introduce kinesis sink on function add pulsarSinkE2E test * remove kinesis test and dep --- .../pulsar/broker/admin/impl/FunctionsBase.java| 2 +- .../org/apache/pulsar/io/PulsarSinkE2ETest.java| 293 + .../client/admin/internal/FunctionsImpl.java | 2 +- pulsar-functions/worker/pom.xml| 2 +- .../pulsar/functions/worker/FunctionActioner.java | 2 +- .../pulsar/functions/worker/WorkerConfig.java | 4 +- pulsar-io/kinesis/pom.xml | 125 + .../io/kinesis/AwsCredentialProviderPlugin.java| 60 + .../org/apache/pulsar/io/kinesis/KinesisSink.java | 250 ++ .../pulsar/io/kinesis/KinesisSinkConfig.java | 57 pulsar-io/pom.xml | 1 + 11 files changed, 792 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index a00a2dd..bacade6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -80,7 +80,7 @@ public class FunctionsBase extends AdminResource implements Supplierhttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.worker.Utils; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.WorkerServer; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import io.netty.util.concurrent.DefaultThreadFactory; +import jersey.repackaged.com.google.common.collect.Lists; + +/** + * Test Pulsar sink o
[incubator-pulsar] branch master updated: Support pulsar source to start consumer with topic patterns (#1903)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 57f463d Support pulsar source to start consumer with topic patterns (#1903) 57f463d is described below commit 57f463d11604533e3d3215baaf43e874a73da03d Author: Rajan Dhabalia AuthorDate: Wed Jun 6 22:00:01 2018 -0700 Support pulsar source to start consumer with topic patterns (#1903) Pass sourceConfig.topicPattern from cmdSink add test for topic-patterns add topic-pattern to ProcessRunTime document topicsPattern usage for java-func only add python instance validaiton for topicPatterns args getSequenceId for topicMessage --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 25 +++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 78 -- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 8 +++ .../functions/instance/JavaInstanceRunnable.java | 1 + .../pulsar/functions/source/PulsarSource.java | 25 ++- .../functions/source/PulsarSourceConfig.java | 1 + .../src/main/python/python_instance_main.py| 3 + .../pulsar/functions/source/PulsarSourceTest.java | 21 ++ .../proto/src/main/proto/Function.proto| 1 + .../pulsar/functions/runtime/JavaInstanceMain.java | 8 +++ .../pulsar/functions/runtime/ProcessRuntime.java | 6 ++ .../pulsar/functions/runtime/RuntimeSpawner.java | 10 +++ .../functions/runtime/ProcessRuntimeTest.java | 7 +- .../pulsar/functions/utils/FunctionConfig.java | 2 + .../apache/pulsar/functions/utils/SinkConfig.java | 3 + .../org/apache/pulsar/functions/utils/Utils.java | 35 +- .../functions/utils/validation/ValidatorImpls.java | 7 +- 17 files changed, 169 insertions(+), 72 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 20499e4..5861e22 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -206,6 +206,31 @@ public class CmdFunctionsTest { } @Test +public void testCreateFunctionWithTopicPatterns() throws Exception { +String fnName = TEST_NAME + "-function"; +String topicPatterns = "persistent://tenant/ns/topicPattern*"; +String outputTopicName = TEST_NAME + "-output-topic"; +cmd.run(new String[] { +"create", +"--name", fnName, +"--topicsPattern", topicPatterns, +"--output", outputTopicName, +"--jar", "SomeJar.jar", +"--tenant", "sample", +"--namespace", "ns1", +"--className", DummyFunction.class.getName(), +}); + +CreateFunction creater = cmd.getCreater(); +assertEquals(fnName, creater.getFunctionName()); +assertEquals(topicPatterns, creater.getTopicsPattern()); +assertEquals(outputTopicName, creater.getOutput()); + +verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString()); + +} + +@Test public void testCreateWithoutTenant() throws Exception { String fnName = TEST_NAME + "-function"; String inputTopicName = "persistent://tenant/standalone/namespace/input-topic"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index a200de8..6915774 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -18,20 +18,28 @@ */ package org.apache.pulsar.admin.cli; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; -import com.beust.jcommander.Parameters; -import com.beust.jcommander.converters.StringConverter; -import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.isNull; +impo
[incubator-pulsar] branch master updated: fix func pkgUrl validation (#1925)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5f7e367 fix func pkgUrl validation (#1925) 5f7e367 is described below commit 5f7e367323d1ed4d79eaf23a166730395ec934b4 Author: Rajan Dhabalia AuthorDate: Wed Jun 6 14:07:16 2018 -0700 fix func pkgUrl validation (#1925) * fix func pkgUrl validation * add test-cases * move isFunctionPackageUrlSupported to utils --- .../pulsar/functions/worker/FunctionActioner.java | 7 +- .../org/apache/pulsar/functions/worker/Utils.java | 7 + .../functions/worker/rest/api/FunctionsImpl.java | 7 +- .../functions/worker/FunctionActionerTest.java | 147 + 4 files changed, 159 insertions(+), 9 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 86de82f..714cb50 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; import java.io.IOException; @@ -35,7 +36,6 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; -import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.isFunctionPackageUrlSupported; import java.io.File; import java.io.FileNotFoundException; @@ -104,7 +104,8 @@ public class FunctionActioner implements AutoCloseable { actioner.join(); } -private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { +@VisibleForTesting +protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); log.info("Starting function {} - {} ...", @@ -112,7 +113,7 @@ public class FunctionActioner implements AutoCloseable { File pkgFile = null; String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); -boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); +boolean isPkgUrlProvided = Utils.isFunctionPackageUrlSupported(pkgLocation); if(isPkgUrlProvided && pkgLocation.startsWith(Utils.FILE)) { pkgFile = new File(pkgLocation); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 586938c..51de807 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -34,6 +34,8 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.UUID; import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.AppendOnlyStreamWriter; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.DistributedLogManager; @@ -231,4 +233,9 @@ public final class Utils { String functionName, int instanceId) { return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId); } + +public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { +return StringUtils.isNotBlank(functionPkgUrl) +&& (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); +} } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index cb2da74..8ac5766 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -713,18 +713,13 @@ public class FunctionsImpl { private FunctionDetails validateUpdat
[incubator-pulsar] branch master updated: Fix: GLIBC_2.14 dep issue for protoc-3.5.0-linux-x86_64 (#1914)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 439f084 Fix: GLIBC_2.14 dep issue for protoc-3.5.0-linux-x86_64 (#1914) 439f084 is described below commit 439f084147065bec2005b2796cd50dc4aecb777e Author: Rajan Dhabalia AuthorDate: Tue Jun 5 13:29:44 2018 -0700 Fix: GLIBC_2.14 dep issue for protoc-3.5.0-linux-x86_64 (#1914) --- managed-ledger/pom.xml | 2 +- pom.xml| 1 + pulsar-broker/pom.xml | 2 +- pulsar-functions/proto/pom.xml | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 9ee8570..98f86b2 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -116,7 +116,7 @@ protobuf-maven-plugin ${protobuf-maven-plugin.version} - com.google.protobuf:protoc:${protobuf3.version}:exe:${os.detected.classifier} + com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier} true diff --git a/pom.xml b/pom.xml index b6be868..17674b6 100644 --- a/pom.xml +++ b/pom.xml @@ -141,6 +141,7 @@ flexible messaging model and an intuitive client API. 2.2.1.SP1 2.4.1 3.5.1 +3.5.1-1 1.12.0 1.0.0 2.8.2 diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index a23e76f..dd4118d 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -319,7 +319,7 @@ protobuf-maven-plugin ${protobuf-maven-plugin.version} - com.google.protobuf:protoc:${protobuf3.version}:exe:${os.detected.classifier} + com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier} true diff --git a/pulsar-functions/proto/pom.xml b/pulsar-functions/proto/pom.xml index 38fca3e..aedb447 100644 --- a/pulsar-functions/proto/pom.xml +++ b/pulsar-functions/proto/pom.xml @@ -61,7 +61,7 @@ protobuf-maven-plugin ${protobuf-maven-plugin.version} - com.google.protobuf:protoc:${protobuf3.version}:exe:${os.detected.classifier} + com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier} true grpc-java io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.version}:exe:${os.detected.classifier} -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Support function registration with package-url (#1902)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8444046 Support function registration with package-url (#1902) 8444046 is described below commit 844404630f01add24100b31ced3de255cf733ea9 Author: Rajan Dhabalia AuthorDate: Tue Jun 5 10:25:35 2018 -0700 Support function registration with package-url (#1902) * Support function registration with package-url * keep one admin api for createFunction --- pom.xml| 6 ++ .../pulsar/broker/admin/impl/FunctionsBase.java| 5 +- .../org/apache/pulsar/client/admin/Functions.java | 18 +++- .../client/admin/internal/FunctionsImpl.java | 17 pulsar-functions/worker/pom.xml| 6 ++ .../pulsar/functions/worker/FunctionActioner.java | 73 +-- .../org/apache/pulsar/functions/worker/Utils.java | 41 - .../functions/worker/rest/api/FunctionsImpl.java | 102 +++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 3 +- .../apache/pulsar/functions/worker/UtilsTest.java | 80 .../rest/api/v2/FunctionApiV2ResourceTest.java | 51 ++- 11 files changed, 338 insertions(+), 64 deletions(-) diff --git a/pom.xml b/pom.xml index 39a31de..b6be868 100644 --- a/pom.xml +++ b/pom.xml @@ -387,6 +387,12 @@ flexible messaging model and an intuitive client API. commons-configuration 1.6 + + +commons-io +commons-io +2.5 + net.jpountz.lz4 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index c22b611..a00a2dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -74,12 +74,13 @@ public class FunctionsBase extends AdminResource implements Supplier + * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * + * + * @param functionDetails + *the function configuration object + * @param pkgUrl + *url from which pkg can be downloaded + * @throws PulsarAdminException + */ +void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException; /** * Update the configuration for a function. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 1e2ae22..1a1ab60 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -122,6 +122,23 @@ public class FunctionsImpl extends BaseResource implements Functions { } @Override +public void createFunctionWithUrl(FunctionDetails functionDetails, String pkgUrl) throws PulsarAdminException { +try { +final FormDataMultiPart mp = new FormDataMultiPart(); + +mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + +mp.bodyPart(new FormDataBodyPart("functionDetails", +printJson(functionDetails), +MediaType.APPLICATION_JSON_TYPE)); + request(functions.path(functionDetails.getTenant()).path(functionDetails.getNamespace()).path(functionDetails.getName())) +.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); +} catch (Exception e) { +throw getApiException(e); +} +} + +@Override public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException { try { request(functions.path(cluster).path(namespace).path(function)) diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 7208861..4860ae7 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -97,6 +97,12 @@ org.apache.distributedlog distributedlog-core + + + commons-io + commons-io + + diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 4edee60..86de82f
[incubator-pulsar] branch master updated: Fix: remove local-cluster from replication list of global-namespace should clean topics (#1647)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9746ea4 Fix: remove local-cluster from replication list of global-namespace should clean topics (#1647) 9746ea4 is described below commit 9746ea44337044a4c5d4f744d4e84e31ef53d327 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Apr 25 22:57:38 2018 -0700 Fix: remove local-cluster from replication list of global-namespace should clean topics (#1647) --- .../broker/service/persistent/PersistentTopic.java | 130 ++--- .../broker/service/ReplicatorGlobalNSTest.java | 110 + 2 files changed, 201 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c34ec84..f425254 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -661,6 +661,21 @@ public class PersistentTopic implements Topic, AddEntryCallback { return delete(false); } +private CompletableFuture delete(boolean failIfHasSubscriptions) { +return delete(failIfHasSubscriptions, false); +} + +/** + * Forcefully close all producers/consumers/replicators and deletes the topic. this function is used when local + * cluster is removed from global-namespace replication list. Because broker doesn't allow lookup if local cluster + * is not part of replication cluster list. + * + * @return + */ +private CompletableFuture deleteForcefully() { +return delete(false, true); +} + /** * Delete the managed ledger associated with this topic * @@ -668,11 +683,14 @@ public class PersistentTopic implements Topic, AddEntryCallback { *Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to *false when called from admin API (it will delete the subs too), and set to true when called from GC *thread - * + * @param closeIfClientsConnected + *Flag indicate whether explicitly close connected producers/consumers/replicators before trying to delete topic. If + *any client is connected to a topic and if this flag is disable then this operation fails. + * * @return Completable future indicating completion of delete operation Completed exceptionally with: * IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails */ -private CompletableFuture delete(boolean failIfHasSubscriptions) { +private CompletableFuture delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -682,48 +700,73 @@ public class PersistentTopic implements Topic, AddEntryCallback { deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); return deleteFuture; } -if (USAGE_COUNT_UPDATER.get(this) == 0) { -isFenced = true; - + +CompletableFuture closeClientFuture = new CompletableFuture<>(); +if (closeIfClientsConnected) { List<CompletableFuture> futures = Lists.newArrayList(); +replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); +producers.forEach(producer -> futures.add(producer.disconnect())); +subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); +FutureUtil.waitForAll(futures).thenRun(() -> { +closeClientFuture.complete(null); +}).exceptionally(ex -> { +log.error("[{}] Error closing clients", topic, ex); +isFenced = false; +closeClientFuture.completeExceptionally(ex); +return null; +}); +} else { +closeClientFuture.complete(null); +} -if (failIfHasSubscriptions) { -if (!subscriptions.isEmpty()) { -isFenced = false; -deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions")); -return deleteFuture; -} -} else { -
[incubator-pulsar.wiki] branch master updated: Updated PIP 7: Pulsar Failure domain and Anti affinity namespaces (markdown)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 2e488b6 Updated PIP 7: Pulsar Failure domain and Anti affinity namespaces (markdown) 2e488b6 is described below commit 2e488b6791dbe770b76aadad59efb5e4ad494aa0 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Apr 13 19:21:36 2018 -0700 Updated PIP 7: Pulsar Failure domain and Anti affinity namespaces (markdown) --- PIP-7:-Pulsar-Failure-domain-and-Anti-affinity-namespaces.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PIP-7:-Pulsar-Failure-domain-and-Anti-affinity-namespaces.md b/PIP-7:-Pulsar-Failure-domain-and-Anti-affinity-namespaces.md index 656935e..1689cbb 100644 --- a/PIP-7:-Pulsar-Failure-domain-and-Anti-affinity-namespaces.md +++ b/PIP-7:-Pulsar-Failure-domain-and-Anti-affinity-namespaces.md @@ -1,4 +1,4 @@ - * **Status**: In-progress + * **Status**: Implemented * **Pull Request**: [#896](https://github.com/apache/incubator-pulsar/pull/896) * **Issue**: [#840](https://github.com/apache/incubator-pulsar/issues/840) -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Refactor pulsar function admin api location (#1508)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new acc3151 Refactor pulsar function admin api location (#1508) acc3151 is described below commit acc31519607a43524366eb536359be0c93abd12c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Apr 5 17:39:58 2018 -0700 Refactor pulsar function admin api location (#1508) * Refactor pulsar function admin api location --- pulsar-client-admin/pom.xml| 12 +++ .../org/apache/pulsar/client/admin/Functions.java | 4 +- .../apache/pulsar/client/admin/PulsarAdmin.java| 11 +++ .../client/admin/internal/FunctionsImpl.java | 26 -- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 22 ++--- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 103 - .../client/admin/PulsarAdminWithFunctions.java | 46 - .../admin/PulsarAdminWithFunctionsBuilderImpl.java | 29 -- pulsar-functions/pom.xml | 1 + pulsar-functions/proto-shaded/pom.xml | 93 +++ pulsar-functions/utils/pom.xml | 4 + 11 files changed, 214 insertions(+), 137 deletions(-) diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index df4f494..e0665b1 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -46,6 +46,18 @@ ${project.version} + + ${project.groupId} + pulsar-functions-proto-shaded + ${project.version} + + + * + * + + + + org.glassfish.jersey.core jersey-client diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java similarity index 96% rename from pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 55ba089..3808a48 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -21,8 +21,8 @@ package org.apache.pulsar.client.admin; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; -import org.apache.pulsar.functions.proto.Function.FunctionConfig; -import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig; +import org.apache.pulsar.functions.shaded.proto.InstanceCommunication.FunctionStatusList; import java.util.List; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index 7bb23b9..a1a67a2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.internal.BrokerStatsImpl; import org.apache.pulsar.client.admin.internal.BrokersImpl; import org.apache.pulsar.client.admin.internal.ClustersImpl; +import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.admin.internal.JacksonConfigurator; import org.apache.pulsar.client.admin.internal.LookupImpl; import org.apache.pulsar.client.admin.internal.NamespacesImpl; @@ -76,6 +77,7 @@ public class PulsarAdmin implements Closeable { private final Client client; private final String serviceUrl; private final Lookup lookups; +private final Functions functions; protected final WebTarget root; protected final Authentication auth; @@ -170,6 +172,7 @@ public class PulsarAdmin implements Closeable { this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth); this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.lookups = new LookupImpl(root, auth, useTls); +this.functions = new FunctionsImpl(root, auth); } /** @@ -304,6 +307,14 @@ public class PulsarAdmin implements Closeable { } /** + * + * @return the functions management object + */ +public Functions functions() { +return functions; +} + +/** * @return the broker statics */ public BrokerStats brokerStats() { diff --git a/pulsar-client-tools/sr
[incubator-pulsar] branch master updated: GC improvement: Reduce string objects from Publisher and Consumer stats (#1480)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new ea8c477 GC improvement: Reduce string objects from Publisher and Consumer stats (#1480) ea8c477 is described below commit ea8c47738007674657db89e1be1eb1efe3077658 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Apr 4 14:08:37 2018 -0700 GC improvement: Reduce string objects from Publisher and Consumer stats (#1480) --- .../org/apache/pulsar/broker/service/Consumer.java | 6 +- .../org/apache/pulsar/broker/service/Producer.java | 8 +-- .../apache/pulsar/broker/service/ServerCnx.java| 4 +- .../service/nonpersistent/NonPersistentTopic.java | 12 ++-- .../broker/service/persistent/PersistentTopic.java | 16 ++--- .../apache/pulsar/broker/admin/AdminApiTest2.java | 47 + .../pulsar/broker/service/BrokerServiceTest.java | 4 +- .../pulsar/common/policies/data/ConsumerStats.java | 65 -- .../common/policies/data/PublisherStats.java | 79 -- .../common/policies/data/ConsumerStatsTest.java| 65 ++ .../common/policies/data/PublisherStatsTest.java | 75 11 files changed, 345 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index ef95399..738205f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -130,10 +130,10 @@ public class Consumer { this.metadata = metadata != null ? metadata : Collections.emptyMap(); stats = new ConsumerStats(); -stats.address = cnx.clientAddress().toString(); +stats.setAddress(cnx.clientAddress().toString()); stats.consumerName = consumerName; -stats.connectedSince = DateFormatter.now(); -stats.clientVersion = cnx.getClientVersion(); +stats.setConnectedSince(DateFormatter.now()); +stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; if (subType == SubType.Shared) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 2088035..eb087bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -98,10 +98,10 @@ public class Producer { this.metadata = metadata != null ? metadata : Collections.emptyMap(); this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats(); -stats.address = cnx.clientAddress().toString(); -stats.connectedSince = DateFormatter.now(); -stats.clientVersion = cnx.getClientVersion(); -stats.producerName = producerName; +stats.setAddress(cnx.clientAddress().toString()); +stats.setConnectedSince(DateFormatter.now()); +stats.setClientVersion(cnx.getClientVersion()); +stats.setProducerName(producerName); stats.producerId = producerId; stats.metadata = this.metadata; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c052e4c..0bc311f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -437,8 +437,8 @@ public class ServerCnx extends PulsarHandler { commandConsumerStatsResponseBuilder.setAvailablePermits(consumerStats.availablePermits); commandConsumerStatsResponseBuilder.setUnackedMessages(consumerStats.unackedMessages); commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs); -commandConsumerStatsResponseBuilder.setAddress(consumerStats.address); - commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince); + commandConsumerStatsResponseBuilder.setAddress(consumerStats.getAddress()); + commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.getConnectedSince()); Subscription subscription = consumer.getSubscription(); commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopi
[incubator-pulsar] branch master updated: Fix JavaSerDeTest package name (#1406)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9e874cb Fix JavaSerDeTest package name (#1406) 9e874cb is described below commit 9e874cbc3f0fbc4b3d30a66e0e7f7a5a749ac4b9 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Mar 19 13:53:12 2018 -0700 Fix JavaSerDeTest package name (#1406) --- .../java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java index 5f43e3c..36e6bfc 100644 --- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java +++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.runtime.serde; +package org.apache.pulsar.functions.api.utils; import static org.testng.Assert.assertEquals; @@ -29,7 +29,7 @@ import org.apache.pulsar.functions.api.utils.JavaSerDe; import org.testng.annotations.Test; /** - * Unit test of {@link JavaSerDeTest}. + * Unit test of {@link JavaSerDe}. */ public class JavaSerDeTest { -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Broker should not start replicator for root partitioned-topic (#1262)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f38a003 Broker should not start replicator for root partitioned-topic (#1262) f38a003 is described below commit f38a003a2a8331da94996fc7ea871abfc779ae24 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Feb 22 18:30:58 2018 -0800 Broker should not start replicator for root partitioned-topic (#1262) * Broker should not start replicator for root partitioned-topic * address comment --- .../pulsar/broker/service/AbstractReplicator.java | 51 +-- .../pulsar/broker/service/BrokerService.java | 53 +--- .../nonpersistent/NonPersistentReplicator.java | 3 +- .../service/nonpersistent/NonPersistentTopic.java | 35 +-- .../service/persistent/PersistentReplicator.java | 3 +- .../broker/service/persistent/PersistentTopic.java | 39 ++-- .../pulsar/broker/service/ReplicatorTest.java | 72 ++ .../pulsar/broker/service/ReplicatorTestBase.java | 3 + 8 files changed, 220 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 49213c9..4642a85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,17 +18,21 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.AbstractReplicator.State; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.DestinationName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +61,9 @@ public abstract class AbstractReplicator { Stopped, Starting, Started, Stopping } -public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, -String remoteCluster, BrokerService brokerService) { +public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, +BrokerService brokerService) throws NamingException { +validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; @@ -69,8 +74,7 @@ public abstract class AbstractReplicator { this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerBuilder = client.newProducer() // -.topic(topicName) -.sendTimeout(0, TimeUnit.SECONDS) // +.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); @@ -211,5 +215,42 @@ public abstract class AbstractReplicator { return (replicatorPrefix + "." + cluster).intern(); } +/** + * Replication can't be started on root-partitioned-topic to avoid producer startup conflict. + * + * + * eg: + * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then + * broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2". + * + * However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual + * producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing + * replicator producers. + * + * + * Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned + * producers. + * + * @param topicName + * @param brokerService + */ +private void validatePartitionedTopic(Stri
[incubator-pulsar] branch master updated: Add get-peer clusters admin api (#1268)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7b07fce Add get-peer clusters admin api (#1268) 7b07fce is described below commit 7b07fce3ccf633f705b5bbe1de3248c5abdcac10 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Feb 22 11:06:19 2018 -0800 Add get-peer clusters admin api (#1268) --- .../pulsar/broker/admin/impl/ClustersBase.java | 22 + .../pulsar/broker/service/PeerReplicatorTest.java | 19 ++ .../org/apache/pulsar/client/admin/Clusters.java | 23 ++ .../pulsar/client/admin/internal/ClustersImpl.java | 10 ++ .../org/apache/pulsar/admin/cli/CmdClusters.java | 14 + .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +++ 6 files changed, 91 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 733105a..de670c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -213,6 +213,28 @@ public class ClustersBase extends AdminResource { } } + @GET + @Path("/{cluster}/peers") + @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = Set.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public Set getPeerCluster(@PathParam("cluster") String cluster) { + validateSuperUserAccess(); + + try { + String clusterPath = path("clusters", cluster); + byte[] content = globalZk().getData(clusterPath, null, null); + ClusterData clusterData = jsonMapper().readValue(content, ClusterData.class); + return clusterData.getPeerClusterNames(); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to get cluster {}: Does not exist", clientAppId(), cluster); + throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + } + @DELETE @Path("/{cluster}") @ApiOperation(value = "Delete an existing cluster") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index c344af5..d5456fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -151,6 +154,22 @@ public class PeerReplicatorTest extends ReplicatorTestBase { } + @Test + public void testGetPeerClusters() throws Exception { + final String mainClusterName = "r1"; + assertEquals(admin1.clusters().getPeerClusterNames(mainClusterName), null); + LinkedHashSet peerClusters = Sets.newLinkedHashSet(Lists.newArrayList("r2", "r3")); + admin1.clusters().updatePeerClusterNames(mainClusterName, peerClusters); + retryStrategically((test) -> { + try { + return admin1.clusters().getPeerClusterNames(mainClusterName).size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 100); + assertEquals(admin1.clusters().getPeerCl
[incubator-pulsar] branch master updated: Add REST api to check host-status for adding/removing from vip (#1241)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new c03943a Add REST api to check host-status for adding/removing from vip (#1241) c03943a is described below commit c03943ae8bdd597b2619766a96233e4a35f49344 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Feb 19 17:44:41 2018 -0800 Add REST api to check host-status for adding/removing from vip (#1241) * Add REST api to check host-status for adding/removing from vip * move vipStatus to common place * address comment --- conf/proxy.conf| 4 conf/websocket.conf| 4 pom.xml| 6 + pulsar-broker-common/pom.xml | 6 + .../pulsar/common/configuration}/VipStatus.java| 27 ++ .../org/apache/pulsar/broker/PulsarService.java| 25 +--- .../org/apache/pulsar/broker/web/WebService.java | 13 +++ .../pulsar/proxy/server/ProxyConfiguration.java| 12 ++ .../pulsar/proxy/server/ProxyServiceStarter.java | 11 + .../org/apache/pulsar/proxy/server/WebServer.java | 19 +++ .../pulsar/websocket/service/ProxyServer.java | 6 ++--- .../service/WebSocketProxyConfiguration.java | 12 ++ .../websocket/service/WebSocketServiceStarter.java | 6 - 13 files changed, 120 insertions(+), 31 deletions(-) diff --git a/conf/proxy.conf b/conf/proxy.conf index d7c5afc..0939452 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -38,6 +38,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + ### --- Authentication --- ### # Enable authentication diff --git a/conf/websocket.conf b/conf/websocket.conf index 404bdef..399efed 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -36,6 +36,10 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls=8443 +# Path for the file used to determine the rotation status for the proxy-instance when responding +# to service discovery health checks +statusFilePath= + # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 diff --git a/pom.xml b/pom.xml index 192769f..eb97e0b 100644 --- a/pom.xml +++ b/pom.xml @@ -374,6 +374,12 @@ flexible messaging model and an intuitive client API. jersey-container-servlet 2.23.2 + + +javax.ws.rs +javax.ws.rs-api +2.0.1 + org.glassfish.jersey.media diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 666fec8..75edadb 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -48,5 +48,11 @@ javax.servlet javax.servlet-api + + + javax.ws.rs + javax.ws.rs-api + + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java similarity index 64% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java rename to pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 1e2499c..5dfba94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.web; +package org.apache.pulsar.common.configuration; import java.io.File; +import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.WebApplicationException; @@ -27,22 +28,28 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; /** - * Web resource used by the VIP service to check to availability of the Pulsar broker instance. + * Web resource used by the VIP service to check to availability of the service instance. */ @Path("/status.html") -@NoSwaggerDocumentation -public class VipStatus extends PulsarWebResource { +public class VipStatus { + +public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + +@Context +protected ServletContext servletContext; @GET @Context public String checkStatus() { -String statusFilePath = pulsar().getStatusFilePath(); -File statusFile = new File(statusFilePath); -
[incubator-pulsar] branch branch-1.22 updated: add httpcore dep required by httpclient in hostname-verification (#1239)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch branch-1.22 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-1.22 by this push: new 96d720c add httpcore dep required by httpclient in hostname-verification (#1239) 96d720c is described below commit 96d720c928b21485df8e055dde379aeeb7f81624 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Feb 14 16:29:37 2018 -0800 add httpcore dep required by httpclient in hostname-verification (#1239) --- all/src/assemble/LICENSE.bin.txt | 1 + pulsar-broker-shaded/pom.xml | 7 +++ pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml | 9 + pulsar-client-shaded/pom.xml | 7 +++ pulsar-client/pom.xml | 7 ++- 5 files changed, 30 insertions(+), 1 deletion(-) diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt index a7e70de..944ba2d 100644 --- a/all/src/assemble/LICENSE.bin.txt +++ b/all/src/assemble/LICENSE.bin.txt @@ -333,6 +333,7 @@ The Apache Software License, Version 2.0 * SnakeYaml -- org.yaml-snakeyaml-*.jar * RocksDB - org.rocksdb.*.jar * HttpClient - org.apache.httpcomponents.httpclient.jar + * HttCore - org.apache.httpcomponents.httpcore.jar * CommonsLogging - commons-logging-*.jar BSD 3-clause "New" or "Revised" License diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index 5794983..6d552bf 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -106,6 +106,7 @@ com.wordnik:swagger-annotations org.apache.httpcomponents:httpclient commons-logging:commons-logging + org.apache.httpcomponents:httpcore @@ -121,6 +122,12 @@ ** + + commons-logging:commons-logging + + ** + + diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml index fe601bf..ea4b71c 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml @@ -127,8 +127,17 @@ com.yahoo.datasketches:sketches-core org.apache.httpcomponents:httpclient commons-logging:commons-logging + org.apache.httpcomponents:httpcore + + + commons-logging:commons-logging + + ** + + + org.apache.kafka.clients.producer.KafkaProducer diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index bebc36d..81f873e 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -83,6 +83,7 @@ com.yahoo.datasketches:sketches-core org.apache.httpcomponents:httpclient commons-logging:commons-logging + org.apache.httpcomponents:httpcore @@ -98,6 +99,12 @@ ** + + commons-logging:commons-logging + + ** + + diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index df6cc8d..abd0300 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -86,12 +86,17 @@ - + commons-logging commons-logging 1.1.1 + + org.apache.httpcomponents + httpcore + 4.4.9 + -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Add non-persistent topic stats separately in brokers-stat (#1235)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3c9e281 Add non-persistent topic stats separately in brokers-stat (#1235) 3c9e281 is described below commit 3c9e28172d486bfb4c39db5248e1033d5079c48f Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Feb 14 12:03:38 2018 -0800 Add non-persistent topic stats separately in brokers-stat (#1235) --- .../apache/pulsar/broker/service/PulsarStats.java | 45 +- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index cdc5ada..835cabc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -55,6 +56,7 @@ public class PulsarStats implements Closeable { private Map<String, NamespaceBundleStats> bundleStats; private List tempMetricsCollection; private List metricsCollection; +private List tempNonPersistentTopics; private final BrokerOperabilityMetrics brokerOperabilityMetrics; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); @@ -71,6 +73,7 @@ public class PulsarStats implements Closeable { this.metricsCollection = Lists.newArrayList(); this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress()); +this.tempNonPersistentTopics = Lists.newArrayList(); } @Override @@ -118,22 +121,46 @@ public class PulsarStats implements Closeable { currentBundleStats.topics = topics.size(); topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle)); + +tempNonPersistentTopics.clear(); +// start persistent topic topicStatsStream.startObject("persistent"); topics.forEach((name, topic) -> { -try { -topic.updateRates(nsStats, currentBundleStats, topicStatsStream, -clusterReplicationMetrics, namespaceName); -} catch (Exception e) { -log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); -} -// this task: helps to activate inactive-backlog-cursors which have caught up and -// connected, also deactivate active-backlog-cursors which has backlog if (topic instanceof PersistentTopic) { +try { +topic.updateRates(nsStats, currentBundleStats, topicStatsStream, +clusterReplicationMetrics, namespaceName); +} catch (Exception e) { +log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); +} +// this task: helps to activate inactive-backlog-cursors which have caught up and +// connected, also deactivate active-backlog-cursors which has backlog ((PersistentTopic) topic).getManagedLedger().checkBackloggedCursors(); +}else if (topic instanceof NonPersistentTopic) { + tempNonPersistentTopics.add((NonPersistentTopic) topic); +} else { +log.warn("Unsupported type of topic {}", topic.getClass().getName()); } }); - +// end persistent topics section topicStatsStream.endObject(); + +if(!tempNonPersistentTopics.isEmpty()) { + // start non-persistent topic +
[incubator-pulsar] branch master updated: Added debug logs in MessageCrypto (#1233)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new fbb42e7 Added debug logs in MessageCrypto (#1233) fbb42e7 is described below commit fbb42e711aa9044ec5a1b30f448173432c1d805a Author: Andrews <sahaya.andr...@gmail.com> AuthorDate: Tue Feb 13 18:12:13 2018 -0800 Added debug logs in MessageCrypto (#1233) --- pulsar-client-cpp/lib/MessageCrypto.cc | 101 + pulsar-client-cpp/lib/MessageCrypto.h | 1 + 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc index 26f1b48..0cc5dec 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.cc +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, unsigned char keyDigest[], unsigned int& digestLen) { if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { -LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName); return false; } digestLen = 0; if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { -LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName); return false; } if (EVP_DigestFinal_ex(mdCtx_, keyDigest, ) != 1) { -LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName); return false; } @@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() { } } +std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) { +static const char* hexVals = "0123456789ABCDEF"; + +std::string outHex; +outHex.reserve(2 * len + 2); +outHex.push_back('0'); +outHex.push_back('x'); +for (size_t i = 0; i < len; ++i) { +const unsigned char c = inputStr[i]; +outHex.push_back(hexVals[c >> 4]); +outHex.push_back(hexVals[c & 15]); +} +return outHex; +} + Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, const CryptoKeyReaderPtr keyReader) { Lock lock(mutex_); @@ -141,7 +156,7 @@ Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { if (keyName.empty()) { -LOG_ERROR(logCtx_ + "Keyname is empty "); +LOG_ERROR(logCtx_ << "Keyname is empty "); return ResultCryptoError; } @@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt EncryptionKeyInfo keyInfo; Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); if (result != ResultOk) { -LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName); return result; } RSA* pubKey = loadPublicKey(keyInfo.getKey()); if (pubKey == NULL) { -LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); +LOG_ERROR(logCtx_ << "Failed to load public key " << keyName); return ResultCryptoError; } +LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully."); int inSize = RSA_size(pubKey); boost::scoped_array encryptedKey(new unsigned char[inSize]); @@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); if (inSize != outSize) { -LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); +LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName); return ResultCryptoError; } std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize); @@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, con
[incubator-pulsar] branch master updated: Support hostname verification on proxy to broker connection (#1214)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new a27a1e2 Support hostname verification on proxy to broker connection (#1214) a27a1e2 is described below commit a27a1e2dd669bc2082cf8f8cefb30becf56fa616 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Sun Feb 11 09:01:55 2018 -0800 Support hostname verification on proxy to broker connection (#1214) * Support hostname verification on proxy to broker connection * remove dep and rename config --- conf/proxy.conf| 3 + .../pulsar/proxy/server/DirectProxyHandler.java| 42 +++- .../pulsar/proxy/server/ProxyConfiguration.java| 10 +++ .../server/ProxyWithProxyAuthorizationTest.java| 75 +++--- 4 files changed, 121 insertions(+), 9 deletions(-) diff --git a/conf/proxy.conf b/conf/proxy.conf index dafcfc4..78ad925 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -74,3 +74,6 @@ tlsCertificateFilePath= # Path for the TLS private key file tlsKeyFilePath= + +# Validates hostname when proxy creates tls connection with broker +tlsHostnameVerificationEnabled=false diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 9fc2ff5..41bf20c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -24,6 +24,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.cert.X509Certificate; +import javax.net.ssl.SSLSession; + import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.common.api.Commands; @@ -37,16 +39,19 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; public class DirectProxyHandler { @@ -104,7 +109,7 @@ public class DirectProxyHandler { } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); -ch.pipeline().addLast(new ProxyBackendHandler()); +ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config)); } }); @@ -124,7 +129,10 @@ public class DirectProxyHandler { if (!future.isSuccess()) { // Close the connection if the connection attempt has failed. inboundChannel.close(); +return; } +final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline().get("proxyOutboundHandler"); +cnx.setRemoteHostName(targetBroker.getHost()); }); } @@ -135,9 +143,17 @@ public class DirectProxyHandler { public class ProxyBackendHandler extends PulsarDecoder implements FutureListener { private BackendState state = BackendState.Init; +private String remoteHostName; +protected ChannelHandlerContext ctx; +private ProxyConfiguration config; + +public ProxyBackendHandler(ProxyConfiguration config) { +this.config = config; +} @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { +this.ctx = ctx; // Send the Connect command to broker String authData = ""; if (authentication.getAuthData().hasDataFromCommand()) { @@ -195,6 +211,15 @@ public class DirectProxyHandler { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel); } + +if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null +&& !verifyTlsHostName(remoteHostName, ctx)) { +
[incubator-pulsar] branch master updated: Add hostname-verification at client tls connection (#1208)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8d3ab43 Add hostname-verification at client tls connection (#1208) 8d3ab43 is described below commit 8d3ab43cee86c9e49a54db13929a4ecb09e8152f Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Feb 9 17:43:23 2018 -0800 Add hostname-verification at client tls connection (#1208) * Add hostname-verification at client tls connection * add httpclient dep with exclude all + add pem in apache-rat * add httpclient+commons-logging dep in client-shading and LICENSE * shade artifacts * fix: proxy send certs to client for host verification --- all/src/assemble/LICENSE.bin.txt | 2 + pom.xml| 13 ++ pulsar-broker-shaded/pom.xml | 6 + .../broker/service/PulsarChannelInitializer.java | 14 ++ .../AuthenticationTlsHostnameVerificationTest.java | 255 + .../tls/hn-verification/broker-cert.pem| 82 +++ .../tls/hn-verification/broker-key.pem | 28 +++ .../authentication/tls/hn-verification/cacert.pem | 79 +++ .../pulsar-client-kafka/pom.xml| 6 + pulsar-client-shaded/pom.xml | 6 + pulsar-client/pom.xml | 18 ++ .../pulsar/client/api/ClientConfiguration.java | 18 ++ .../org/apache/pulsar/client/impl/ClientCnx.java | 54 - .../apache/pulsar/client/impl/ConnectionPool.java | 2 + .../proxy/server/ServiceChannelInitializer.java| 14 ++ .../server/ProxyWithProxyAuthorizationTest.java| 51 - 16 files changed, 642 insertions(+), 6 deletions(-) diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt index 41f9000..a7e70de 100644 --- a/all/src/assemble/LICENSE.bin.txt +++ b/all/src/assemble/LICENSE.bin.txt @@ -332,6 +332,8 @@ The Apache Software License, Version 2.0 * Jetty - org.eclipse.jetty-*.jar * SnakeYaml -- org.yaml-snakeyaml-*.jar * RocksDB - org.rocksdb.*.jar + * HttpClient - org.apache.httpcomponents.httpclient.jar + * CommonsLogging - commons-logging-*.jar BSD 3-clause "New" or "Revised" License * EA Agent Loader -- com.ea.agentloader-*.jar -- licenses/LICENSE-EA-Agent-Loader.txt diff --git a/pom.xml b/pom.xml index 27ff691..7320661 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,18 @@ flexible messaging model and an intuitive client API. +org.apache.httpcomponents +httpclient +4.5.5 + + +* +* + + + + + org.testng testng 6.13.1 @@ -760,6 +772,7 @@ flexible messaging model and an intuitive client API. **/*.crt **/*.key **/*.csr +**/*.pem **/*.json **/*.htpasswd src/test/resources/athenz.conf.test diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index fd3ff68..bda3037 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -104,6 +104,8 @@ org.aspectj:* com.ea.agentloader:* com.wordnik:swagger-annotations + org.apache.httpcomponents:httpclient + commons-logging:commons-logging @@ -298,6 +300,10 @@ com.wordnik org.apache.pulsar.shade.com.worknik + + org.apache.http + org.apache.pulsar.shade.org.apache.http + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index cd0415a..3138769 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,8 +19,11 @@ package org.apache.pulsar.broker.service; import java.io.File; +import java.security.cert.X509Certificate; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.impl.auth.AuthenticationDataTls; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.PulsarDecoder; @@ -68,6 +71,17 @@ public class PulsarChannelInitializer extends ChannelInitializer builder.trustM
[incubator-pulsar] branch master updated: Add connection timeout for binary lookup request (#1184)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new b7e2e7c Add connection timeout for binary lookup request (#1184) b7e2e7c is described below commit b7e2e7cab6bbac3f8670a333547abdf77fa5d24c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Feb 7 18:59:19 2018 -0800 Add connection timeout for binary lookup request (#1184) * Add connection timeout for binary lookup request * operation timeout * fix test --- .../client/impl/BrokerClientIntegrationTest.java | 2 - .../org/apache/pulsar/client/impl/ClientCnx.java | 21 +++ .../apache/pulsar/client/impl/ClientCnxTest.java | 64 ++ 3 files changed, 85 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 6affa58..342e232 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -583,7 +583,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { final int concurrentLookupRequests = 20; ClientConfiguration clientConf = new ClientConfiguration(); clientConf.setMaxNumberOfRejectedRequestPerConnection(0); -clientConf.setOperationTimeout(1, TimeUnit.MILLISECONDS); clientConf.setStatsInterval(0, TimeUnit.SECONDS); stopBroker(); pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(1); @@ -595,7 +594,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { clientConf2.setStatsInterval(0, TimeUnit.SECONDS); clientConf2.setIoThreads(concurrentLookupRequests); clientConf2.setConnectionsPerBroker(20); -clientConf2.setOperationTimeout(1, TimeUnit.MILLISECONDS); pulsarClient2 = (PulsarClientImpl) PulsarClient.create(lookupUrl, clientConf2); ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 5ad4c66..3f2d176 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -60,6 +60,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.Errors.NativeIoException; import io.netty.util.concurrent.Promise; +import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; public class ClientCnx extends PulsarHandler { @@ -83,6 +84,7 @@ public class ClientCnx extends PulsarHandler { private volatile int numberOfRejectRequests = 0; private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; +private final long operationTimeoutMs; private String proxyToTargetBrokerAddress = null; @@ -96,6 +98,7 @@ public class ClientCnx extends PulsarHandler { this.authentication = conf.getAuthentication(); this.eventLoopGroup = eventLoopGroup; this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection(); +this.operationTimeoutMs = conf.getOperationTimeoutMs(); this.state = State.None; } @@ -268,6 +271,12 @@ public class ClientCnx extends PulsarHandler { CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId); if (requestFuture != null) { +if (requestFuture.isCompletedExceptionally()) { +if (log.isDebugEnabled()) { +log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId()); +} +return; +} // Complete future with exception if : Result.response=fail/null if (!lookupResult.hasResponse() || CommandLookupTopicResponse.LookupType.Failed.equals(lookupResult.getResponse())) { @@ -297,6 +306,12 @@ public class ClientCnx extends PulsarHandler { CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId); if (requestFuture != null) { +if (requestFuture.isCompletedExceptionally()) { +if (log.isDebugEnabled()) { +log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId()); +
[incubator-pulsar] branch master updated: Add api to get list of non-persistent topics (#1114)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4e0812b Add api to get list of non-persistent topics (#1114) 4e0812b is described below commit 4e0812bcaa4ad7a7f233cef307fe761bb787edc2 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Jan 25 23:22:49 2018 -0800 Add api to get list of non-persistent topics (#1114) --- .../pulsar/broker/admin/NonPersistentTopics.java | 106 + .../apache/pulsar/broker/admin/AdminApiTest2.java | 25 + .../pulsar/client/admin/NonPersistentTopics.java | 39 .../admin/internal/NonPersistentTopicsImpl.java| 67 - .../pulsar/admin/cli/CmdNonPersistentTopics.java | 30 ++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 6 ++ 6 files changed, 271 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java index f4429a8..f765934 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java @@ -19,8 +19,13 @@ package org.apache.pulsar.broker.admin; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.util.Codec.decode; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; @@ -32,18 +37,26 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.naming.DestinationDomain; import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -147,7 +160,100 @@ public class NonPersistentTopics extends PersistentTopics { } unloadTopic(dn, authoritative); } + +@GET +@Path("/{property}/{cluster}/{namespace}") +@ApiOperation(value = "Get the list of non-persistent topics under a namespace.", response = String.class, responseContainer = "List") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 404, message = "Namespace doesn't exist") }) +public List getList(@PathParam("property") String property, @PathParam("cluster") String cluster, +@PathParam("namespace") String namespace) { +log.info("[{}] list of topics on namespace {}/{}/{}/{}", clientAppId(), property, cluster, namespace); +validateAdminAccessOnProperty(property); +Policies policies = getNamespacePolicies(property, cluster, namespace); +NamespaceName nsName = NamespaceName.get(property, cluster, namespace); + +if (!cluster.equals(Namespaces.GLOBAL_CLUSTER)) { +validateClusterOwnership(cluster); +validateClusterForProperty(property, cluster); +} else { +// check cluster ownership for a given global namespace: redirect if peer-cluster owns it +validateGlobalNamespaceOwnership(nsName); +} +final List<CompletableFuture<List>> futures = Lists.newArrayList(); +final List boundaries = policies.bundles.getBoundaries(); +for (int i = 0; i < boundaries.size() - 1; i++) { +final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); +try { + futures.ad
[incubator-pulsar] branch master updated: Advertise localhost address for unit test (#1105)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 72432ce Advertise localhost address for unit test (#1105) 72432ce is described below commit 72432ce7bb6ebbccb104147f3110a14f51c0a04c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Jan 24 21:00:54 2018 -0800 Advertise localhost address for unit test (#1105) * Advertise localhost address for unit test * fix test --- .../src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java | 1 + .../org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 1 + .../java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java | 1 + .../org/apache/pulsar/broker/service/AdvertisedAddressTest.java | 1 + .../org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java | 1 + .../org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java| 1 + .../java/org/apache/pulsar/broker/service/ReplicatorTestBase.java | 3 +++ .../src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java | 1 + .../java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java | 6 ++ .../java/org/apache/pulsar/client/api/NonPersistentTopicTest.java | 3 +++ 10 files changed, 19 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 73071ff..8e7db9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -88,6 +88,7 @@ public class SLAMonitoringTest { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerServicePort(brokerNativeBrokerPorts[i]); config.setClusterName("my-cluster"); +config.setAdvertisedAddress("localhost"); config.setWebServicePort(brokerWebServicePorts[i]); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setBrokerServicePort(brokerNativeBrokerPorts[i]); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 57ed5e5..c23d726 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -89,6 +89,7 @@ public abstract class MockedPulsarServiceBaseTest { this.conf = new ServiceConfiguration(); this.conf.setBrokerServicePort(BROKER_PORT); this.conf.setBrokerServicePortTls(BROKER_PORT_TLS); +this.conf.setAdvertisedAddress("localhost"); this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT); this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); this.conf.setClusterName(configClusterName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 8511e40..80ca562 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -137,6 +137,7 @@ public class LoadBalancerTest { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerServicePort(brokerNativeBrokerPorts[i]); config.setClusterName("use"); +config.setAdvertisedAddress("localhost"); config.setWebServicePort(brokerWebServicePorts[i]); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setBrokerServicePort(brokerNativeBrokerPorts[i]); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 266354f..dda76b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -50,6 +50,7 @@ public class AdvertisedAddressTest { config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setWebServicePort(BROKER_WEBSERVICE_PORT); config.setClusterName("usc"); +config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(BROKER_SERVICE_PORT);
[incubator-pulsar] branch master updated: Fix: ignore auth flag for websocket-proxy to broker authentication (#1106)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 574ebde Fix: ignore auth flag for websocket-proxy to broker authentication (#1106) 574ebde is described below commit 574ebdebaf0df57da7ee3f3fd19496124ae8cae7 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Jan 24 21:00:43 2018 -0800 Fix: ignore auth flag for websocket-proxy to broker authentication (#1106) --- .../src/main/java/org/apache/pulsar/websocket/WebSocketService.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 28a0ed5..02a589f 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -184,7 +184,8 @@ public class WebSocketService implements Closeable { clientConf.setIoThreads(config.getWebSocketNumIoThreads()); clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker()); -if (config.isAuthenticationEnabled()) { +if (isNotBlank(config.getBrokerClientAuthenticationPlugin()) +&& isNotBlank(config.getBrokerClientAuthenticationParameters())) { clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()); } -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.
[incubator-pulsar] branch master updated: Introduce config to skip non-recoverable data-ledger (#1046)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f5a2ec7 Introduce config to skip non-recoverable data-ledger (#1046) f5a2ec7 is described below commit f5a2ec74e834e50f1970927f8fa6c107f7a3453c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Jan 24 13:38:39 2018 -0800 Introduce config to skip non-recoverable data-ledger (#1046) --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../bookkeeper/mledger/ManagedLedgerConfig.java| 15 +++ .../bookkeeper/mledger/ManagedLedgerException.java | 13 +++ .../bookkeeper/mledger/impl/EntryCacheImpl.java| 11 +- .../bookkeeper/mledger/impl/EntryCacheManager.java | 3 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 +++-- .../mledger/impl/ManagedLedgerFactoryImpl.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 50 +++-- .../bookkeeper/mledger/impl/OpReadEntry.java | 25 - .../apache/pulsar/broker/ServiceConfiguration.java | 12 ++ .../pulsar/broker/service/BrokerService.java | 26 + .../broker/service/BrokerBkEnsemblesTests.java | 125 - site/_data/config/broker.yaml | 3 + site/_data/config/standalone.yaml | 2 + 15 files changed, 287 insertions(+), 32 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 20da0a9..d983b3f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -300,6 +300,9 @@ managedLedgerMaxUnackedRangesToPersist=1 # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 +# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets +# corrupted at bookkeeper and managed-cursor is stuck at that ledger. +autoSkipNonRecoverableData=false ### --- Load balancer --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index ce35f2e..ec400fb 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -266,6 +266,9 @@ managedLedgerMaxUnackedRangesToPersist=1 # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 +# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets +# corrupted at bookkeeper and managed-cursor is stuck at that ledger. +autoSkipNonRecoverableData=false ### --- Load balancer --- ### diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 391a484..6f9847b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -51,6 +51,7 @@ public class ManagedLedgerConfig { private double throttleMarkDelete = 0; private long retentionTimeMs = 0; private long retentionSizeInMB = 0; +private boolean autoSkipNonRecoverableData; private DigestType digestType = DigestType.MAC; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -354,6 +355,20 @@ public class ManagedLedgerConfig { } /** + * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets + * corrupted at bookkeeper and managed-cursor is stuck at that ledger. + * + * @param autoSkipNonRecoverableData + */ +public boolean isAutoSkipNonRecoverableData() { +return autoSkipNonRecoverableData; +} + +public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) { +this.autoSkipNonRecoverableData = skipNonRecoverableData; +} + +/** * @return max unacked message ranges that will be persisted and recovered. * */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1817aaf..f5c4243 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -31,6 +31,13 @@ public class ManagedLedgerException extends Exception { super(e); } +public static ManagedLedgerException getManagedLedgerException(Throwable e) { +if (e instanceof ManagedLedgerException) { +return (ManagedLedgerException) e; +} +return new ManagedLedgerException(e); +} + public static class MetaStoreException extends ManagedLedgerException { public MetaStoreException(Exception e) { su
svn commit: r20756 - /dev/incubator/pulsar/KEYS
Author: rdhabalia Date: Thu Aug 3 22:58:34 2017 New Revision: 20756 Log: Added gpg key for rdhabalia Modified: dev/incubator/pulsar/KEYS Modified: dev/incubator/pulsar/KEYS == --- dev/incubator/pulsar/KEYS (original) +++ dev/incubator/pulsar/KEYS Thu Aug 3 22:58:34 2017 @@ -51,3 +51,40 @@ COGVJWUffm3OZHzC6IgwavgsXbjp/JZIZH3AUdyZ /hPels3A0sS//jGruY3A4XfmrRFOi6+qh6em50kNRameseElLM3BOQ== =HWMB -END PGP PUBLIC KEY BLOCK- +pub rsa2048 2017-08-03 [SC] [expires: 2019-08-03] + CC251AE1C7C5BF592E9D6DE13956248899767A23 +uid [ultimate] rdhabalia <rdhaba...@apache.org> +sig 33956248899767A23 2017-08-03 rdhabalia <rdhaba...@apache.org> +sub rsa2048 2017-08-03 [E] [expires: 2019-08-03] +sig 3956248899767A23 2017-08-03 rdhabalia <rdhaba...@apache.org> + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQENBFmDpigBCAC8qs1e7eQj3a52vW4fS/xsfY0PlWfnwOiIhJ2OfD7gVhm8nyL1 +A/ySeUvqzsHjdYWSh/yICoKm5I++wgquS+h5CfR3WYfku5mtBzaph461rdpbSfpN +ESQGV7VFQHDHihMcXlUBsFPzuxWErjxHWaQNZjGTQR9+9flzpdP++CkSp9mlAqUb +fSsepcQEwm7PhioKNzDtiSmY1XbQwETMX7peKqSP3hE1igZlEXoHvVqRk5DY61V6 +W6iK78ZpY7bDay4omfYLc9IBWjvurxkH2lDpfDqD0Vz4WhViJLuRziPCFTpb/iXq +z4ENBzQs7pX0ATPjYYVfK84xNJ2J0a6vaVXTABEBAAG0IHJkaGFiYWxpYSA8cmRo +YWJhbGlhQGFwYWNoZS5vcmc+iQFUBBMBCAA+FiEEzCUa4cfFv1kunW3hOVYkiJl2 +eiMFAlmDpigCGwMFCQPCZwAFCwkIBwIGFQgJCgsCBBYCAwECHgECF4AACgkQOVYk +iJl2eiPmqwgAqraC9XYBnjJc8USNW3KUs9WNBt0g9oFAWiKr1x/CmDwC/BBla1Z8 +wzalm/NapC/Fhx2s9/Vmi6lDZ0l5JxI4Q6bDwPQEiGyMUapHXMbk+53X9jKpRn7W +HbORg392Z6On/zjbxvZD+xKqWoYTfdpU+C0nsPiCH6RdsxbHxeO00wcHainrRitC +rNWyDHNbWFWV1EkXjJTZFy8OO185Ads/qlYP1siFy6SdciG/y/vfUNM9auphVa5U +kOGTmEfUCB6YT4w0lUzHxWixZG2PDfKA9Vg2EI2Smqt3uwb0Z9UOUq6ZKmL4LqK6 +N7Ta2KhE+bY9mG45SgIwvmto0PyoRGF+/rkBDQRZg6YoAQgAqUaOPCESsnJ8Vh1h +/ao/VMRusX6eJOjGK2DhlLTc0rrear9a/or+lfYWV7emLCDmhsVYvhflHBtZsb36 +v5nDFwr2G+iHcQkewtU+TptMZT5gfY8dM2DwUooAFl1bHU0IIOeDXyJ44utDik86 +k6Rr4AoSVERU0Leo2w4NeZb8V2yPGsDf0ql5u95ZDkqTxPRLKp/CqRbSSXmZwHy6 +GC/o4jMPSwjbkh985tUYhCaKNW3pVTm7ETHiyxLFT3LFKCx3j3TjoNTJoHEFt+Ft +bSbsa0kUJYU9n20bakbTnFm/RilJs23D3YrutCO43l/nLt+2ronyBoU7N9IhRee6 +iUHrvwARAQABiQE8BBgBCAAmFiEEzCUa4cfFv1kunW3hOVYkiJl2eiMFAlmDpigC +GwwFCQPCZwAACgkQOVYkiJl2eiMGEwf9GZVx5pKcnGfUDnkopYtlVrO3I0X7WLKG +YMm/K4lhklKY3Yfb7lWyBvdYX4q1HUhTNzk/Fq1aPnv4Gio4cPobgfuF9rR6LCxY +0oRDtkiW3V3tp0R3ThJdczInlsiVvlMCSF4KjDjcFELS00WVKsxb73xxxnx45nWh +xcoGiU+YVrcPbWJXzoTwpTsSTCxirV3XTrI5fmS1FUBnJESnVBYj60HDapkdKxfp +hryaT5d9KzFMp0B7+XpvyhNJrywafgGOMNSo2zrr7hwzP8WrrjbGsaG73/L4H+B7 +fKBF9EgNhXGM+JpHk//JdTlnCyqzeCsfHSTQnfwjrsN30IKrcI8W5A== +=PNp3 +-END PGP PUBLIC KEY BLOCK-