This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2147201 Submit and run locally builtin connectors (#2114) 2147201 is described below commit 21472010db047398612ba94824ac56434f1ac38a Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jul 11 17:12:14 2018 -0700 Submit and run locally builtin connectors (#2114) * Submit and run locally builtin connectors * Fixed test class constructor * Fixed null check * Fixed function package update * Fixed sourceClassName when submiting custom archive --- bin/pulsar | 2 +- bin/pulsar-admin | 8 +- .../org/apache/pulsar/client/admin/Functions.java | 19 +++ .../client/admin/internal/FunctionsImpl.java | 25 +++- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 101 ++++++++++++++-- .../org/apache/pulsar/admin/cli/CmdSources.java | 99 ++++++++++++++-- .../proto/src/main/proto/Function.proto | 8 ++ .../org/apache/pulsar/functions/utils/Utils.java | 5 +- .../functions/utils/validation/ValidatorImpls.java | 12 +- .../pulsar/functions/worker/FunctionActioner.java | 128 +++++++++++++++++---- .../functions/worker/FunctionRuntimeManager.java | 9 +- .../pulsar/functions/worker/WorkerService.java | 6 +- .../functions/worker/rest/api/FunctionsImpl.java | 85 +++++++++++--- .../functions/worker/FunctionActionerTest.java | 8 +- .../worker/FunctionRuntimeManagerTest.java | 9 +- .../functions/worker/MembershipManagerTest.java | 9 +- 16 files changed, 448 insertions(+), 85 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index d8f6e21..c28bfe0 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -19,7 +19,7 @@ # BINDIR=$(dirname "$0") -PULSAR_HOME=`cd $BINDIR/..;pwd` +export PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf diff --git a/bin/pulsar-admin b/bin/pulsar-admin index c984617..1a1339d 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,7 @@ # BINDIR=$(dirname "$0") -PULSAR_HOME=`cd $BINDIR/..;pwd` +export PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml @@ -55,7 +55,7 @@ fi # exclude tests jar BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then +if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then echo "\nCouldn't find pulsar jar."; echo "Make sure you've run 'mvn package'\n"; exit 1; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index d34f419..9f7339c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.admin; import java.util.List; +import java.util.Set; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -232,4 +233,22 @@ public interface Functions { * */ List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sources currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + Set<String> getSources() throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sinks currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + Set<String> getSinks() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 5cb9284..a2008cf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.StandardCopyOption; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; @@ -37,6 +39,7 @@ import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; @@ -110,7 +113,10 @@ public class FunctionsImpl extends BaseResource implements Functions { try { final FormDataMultiPart mp = new FormDataMultiPart(); - mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails), @@ -153,9 +159,12 @@ public class FunctionsImpl extends BaseResource implements Functions { public void updateFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); - if (fileName != null) { + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); } + mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails), MediaType.APPLICATION_JSON_TYPE)); @@ -251,6 +260,18 @@ public class FunctionsImpl extends BaseResource implements Functions { } } + @Override + public Set<String> getSources() throws PulsarAdminException { + return getConnectorsList().stream().filter(c -> !StringUtils.isEmpty(c.getSourceClass())) + .map(ConnectorDefinition::getName).collect(Collectors.toSet()); + } + + @Override + public Set<String> getSinks() throws PulsarAdminException { + return getConnectorsList().stream().filter(c -> !StringUtils.isEmpty(c.getSinkClass())) + .map(ConnectorDefinition::getName).collect(Collectors.toSet()); + } + public static void mergeJson(String json, Builder builder) throws IOException { JsonFormat.parser().merge(json, builder); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 2c91588..c27b40a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -24,12 +24,26 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; @@ -44,6 +58,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.SinkConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import java.io.File; @@ -139,6 +154,25 @@ public class CmdSinks extends CmdBase { .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), sinkConfig.getArchive(), admin); } + + @Override + protected String validateSinkType(String sinkType) throws IOException { + // Validate the connector sink type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); + + if (!connectors.getSinks().containsKey(sinkType)) { + throw new ParameterException("Invalid sink type '" + sinkType + "' -- Available sinks are: " + + connectors.getSinks().keySet()); + } + + // Sink type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSinks().get(sinkType).toString(); + } } @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster") @@ -174,6 +208,10 @@ public class CmdSinks extends CmdBase { protected String namespace; @Parameter(names = "--name", description = "The sink's name") protected String name; + + @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider") + protected String sinkType; + @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)") protected String inputs; @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") @@ -246,10 +284,18 @@ public class CmdSinks extends CmdBase { sinkConfig.setParallelism(parallelism); } + if (archive != null && sinkType != null) { + throw new ParameterException("Cannot specify both archive and sink-type"); + } + if (null != archive) { sinkConfig.setArchive(archive); } + if (sinkType != null) { + sinkConfig.setArchive(validateSinkType(sinkType)); + } + org.apache.pulsar.functions.utils.Resources resources = sinkConfig.getResources(); if (resources == null) { resources = new org.apache.pulsar.functions.utils.Resources(); @@ -307,6 +353,7 @@ public class CmdSinks extends CmdBase { throw new ParameterException("Sink archive not specfied"); } + boolean isConnectorBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()); String archivePath = null; @@ -326,6 +373,9 @@ public class CmdSinks extends CmdBase { + ", due to =" + e.getMessage()); } } + } else if (isConnectorBuiltin) { + // Ignore local checks when submitting built-in connector + archivePath = null; } else { archivePath = sinkConfig.getArchive(); } @@ -339,6 +389,7 @@ public class CmdSinks extends CmdBase { try { ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath); log.info("Connector: {}", connector); + // Validate sink class ConnectorUtils.getIOSinkClass(archivePath); } catch (IOException e) { @@ -368,16 +419,23 @@ public class CmdSinks extends CmdBase { // check if configs are valid validateSinkConfigs(sinkConfig); - String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); + String sinkClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); - String typeArg; - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), - Collections.emptySet())) { - typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null - : Utils.getSinkType(sinkClassName, ncl).getName(); + if (!isBuiltin) { + sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); + + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), + Collections.emptySet())) { + typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null + : Utils.getSinkType(sinkClassName, ncl).getName(); + } } - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sinkConfig.getTenant() != null) { functionDetailsBuilder.setTenant(sinkConfig.getTenant()); } @@ -414,7 +472,15 @@ public class CmdSinks extends CmdBase { // set up sink spec SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - sinkSpecBuilder.setClassName(sinkClassName); + if (sinkClassName != null) { + sinkSpecBuilder.setClassName(sinkClassName); + } + + if (isBuiltin) { + String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", ""); + sinkSpecBuilder.setBuiltin(builtin); + } + if (sinkConfig.getConfigs() != null) { sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); } @@ -438,6 +504,23 @@ public class CmdSinks extends CmdBase { } return functionDetailsBuilder.build(); } + + protected String validateSinkType(String sinkType) throws IOException { + Set<String> availableSinks; + try { + availableSinks = admin.functions().getSinks(); + } catch (PulsarAdminException e) { + throw new IOException(e); + } + + if (!availableSinks.contains(sinkType)) { + throw new ParameterException( + "Invalid sink type '" + sinkType + "' -- Available sinks are: " + availableSinks); + } + + // Source type is a valid built-in connector type + return "builtin://" + sinkType; + } } @Parameters(commandDescription = "Stops a Pulsar IO sink connector") @@ -475,7 +558,7 @@ public class CmdSinks extends CmdBase { } @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster") - public class ListSinks extends SinkCommand { + public class ListSinks extends BaseCommand { @Override void runCmd() throws Exception { admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSinkClass())) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 53609d0..7c147e5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -24,12 +24,23 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; @@ -43,6 +54,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import java.io.File; @@ -135,6 +147,25 @@ public class CmdSources extends CmdBase { .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), sourceConfig.getArchive(), admin); } + + @Override + protected String validateSourceType(String sourceType) throws IOException { + // Validate the connector source type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); + + if (!connectors.getSources().containsKey(sourceType)) { + throw new ParameterException("Invalid source type '" + sourceType + "' -- Available sources are: " + + connectors.getSources().keySet()); + } + + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSources().get(sourceType).toString(); + } } @Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster") @@ -170,10 +201,12 @@ public class CmdSources extends CmdBase { protected String namespace; @Parameter(names = "--name", description = "The source's name") protected String name; + + @Parameter(names = { "-t", "--source-type" }, description = "The source's connector provider") + protected String sourceType; + @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source") protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--className", description = "The source's class name") - protected String className; @Parameter(names = "--destinationTopicName", description = "The Pulsar topic to which data is sent") protected String destinationTopicName; @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source") @@ -228,10 +261,18 @@ public class CmdSources extends CmdBase { sourceConfig.setParallelism(parallelism); } + if (archive != null && sourceType != null) { + throw new ParameterException("Cannot specify both archive and source-type"); + } + if (archive != null) { sourceConfig.setArchive(archive); } + if (sourceType != null) { + sourceConfig.setArchive(validateSourceType(sourceType)); + } + org.apache.pulsar.functions.utils.Resources resources = sourceConfig.getResources(); if (resources == null) { resources = new org.apache.pulsar.functions.utils.Resources(); @@ -276,6 +317,7 @@ public class CmdSources extends CmdBase { throw new ParameterException("Source archive not specfied"); } + boolean isConnectorBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()); String archivePath = null; @@ -295,6 +337,9 @@ public class CmdSources extends CmdBase { + ", due to =" + e.getMessage()); } } + } else if (isConnectorBuiltin) { + // Ignore local checks when submitting built-in connector + archivePath = null; } else { archivePath = sourceConfig.getArchive(); } @@ -338,16 +383,23 @@ public class CmdSources extends CmdBase { // check if source configs are valid validateSourceConfigs(sourceConfig); - String sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); + String sourceClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); + + if (!isBuiltin) { + sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); - String typeArg; - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), - Collections.emptySet())) { - typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? null - : getSourceType(sourceClassName, ncl).getName(); + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), + Collections.emptySet())) { + typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? null + : getSourceType(sourceClassName, ncl).getName(); + } } - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sourceConfig.getTenant() != null) { functionDetailsBuilder.setTenant(sourceConfig.getTenant()); } @@ -368,7 +420,14 @@ public class CmdSources extends CmdBase { // set source spec SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setClassName(sourceClassName); + if (sourceClassName != null) { + sourceSpecBuilder.setClassName(sourceClassName); + } + + if (isBuiltin) { + String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", ""); + sourceSpecBuilder.setBuiltin(builtin); + } if (sourceConfig.getConfigs() != null) { sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); @@ -409,6 +468,23 @@ public class CmdSources extends CmdBase { return functionDetailsBuilder.build(); } + + protected String validateSourceType(String sourceType) throws IOException { + Set<String> availableSources; + try { + availableSources = admin.functions().getSources(); + } catch (PulsarAdminException e) { + throw new IOException(e); + } + + if (!availableSources.contains(sourceType)) { + throw new ParameterException( + "Invalid source type '" + sourceType + "' -- Available sources are: " + availableSources); + } + + // Source type is a valid built-in connector type + return "builtin://" + sourceType; + } } @Parameters(commandDescription = "Stops a Pulsar IO source connector") @@ -446,7 +522,7 @@ public class CmdSources extends CmdBase { } @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster") - public class ListSources extends SourceCommand { + public class ListSources extends BaseCommand { @Override void runCmd() throws Exception { admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) @@ -457,5 +533,4 @@ public class CmdSources extends CmdBase { }); } } - } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 0f8b649..01b3660 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -71,6 +71,10 @@ message SourceSpec { map<string,string> topicsToSerDeClassName = 4; uint64 timeoutMs = 6; string topicsPattern = 7; + + /* If specified, this will refer to an archive that is + * already present in the server */ + string builtin = 8; } message SinkSpec { @@ -82,6 +86,10 @@ message SinkSpec { // configs used only when functions output to sink string topic = 3; string serDeClassName = 4; + + /* If specified, this will refer to an archive that is + * already present in the server */ + string builtin = 6; } message PackageLocationMetaData { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 8b5e9fb..0c25be2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -55,6 +55,7 @@ public class Utils { public static String HTTP = "http"; public static String FILE = "file"; + public static String BUILTIN = "builtin"; public static final long getSequenceId(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) @@ -222,7 +223,7 @@ public class Utils { } public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return isNotBlank(functionPkgUrl) - && (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); + return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP) + || functionPkgUrl.startsWith(Utils.FILE)); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index 7ff6b97..297a2f2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -643,6 +643,11 @@ public class ValidatorImpls { @Override public void validateField(String name, Object o) { SourceConfig sourceConfig = (SourceConfig) o; + if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { + // We don't have to check the archive, since it's provided on the worker itself + return; + } + String sourceClassName; try { sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); @@ -714,6 +719,11 @@ public class ValidatorImpls { @Override public void validateField(String name, Object o) { SinkConfig sinkConfig = (SinkConfig) o; + if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { + // We don't have to check the archive, since it's provided on the worker itself + return; + } + // if function-pkg url is present eg: file://xyz.jar then admin-tool might not have access of the file at // the same location so, need to rely on server side validation. if (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive())) { @@ -805,7 +815,7 @@ public class ValidatorImpls { if(!Utils.isFunctionPackageUrlSupported(path)) { // check file existence if path is not url and local path - if (!fileExists(path)) { + if (!path.startsWith(Utils.BUILTIN) && !fileExists(path)) { throw new IllegalArgumentException (String.format("File %s specified in field '%s' does not exist", path, name)); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index ef519f4..4741c40 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -20,8 +20,13 @@ package org.apache.pulsar.functions.worker; import static org.apache.pulsar.functions.utils.Utils.FILE; import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.MoreFiles; +import com.google.common.io.RecursiveDeleteOption; + import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -30,29 +35,32 @@ import java.net.URL; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.MoreFiles; -import com.google.common.io.RecursiveDeleteOption; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; @Data @Setter @@ -68,15 +76,18 @@ public class FunctionActioner implements AutoCloseable { private LinkedBlockingQueue<FunctionAction> actionQueue; private volatile boolean running; private Thread actioner; + private final ConnectorsManager connectorsManager; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, - LinkedBlockingQueue<FunctionAction> actionQueue) { + LinkedBlockingQueue<FunctionAction> actionQueue, + ConnectorsManager connectorsManager) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.actionQueue = actionQueue; + this.connectorsManager = connectorsManager; actioner = new Thread(() -> { log.info("Starting Actioner Thread..."); while(running) { @@ -118,30 +129,33 @@ public class FunctionActioner implements AutoCloseable { protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - log.info("Starting function {} - {} ...", - functionMetaData.getFunctionDetails().getName(), instanceId); + + FunctionDetails.Builder functionDetails = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()); + log.info("Starting function {} - {} ...", functionDetails.getName(), instanceId); File pkgFile = null; - + String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); - - if(isPkgUrlProvided && pkgLocation.startsWith(FILE)) { + + if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { URL url = new URL(pkgLocation); pkgFile = new File(url.toURI()); + } else if (isFunctionCodeBuiltin(functionDetails)) { + pkgFile = getBuiltinArchive(functionDetails); } else { File pkgDir = new File( workerConfig.getDownloadDirectory(), getDownloadPackagePath(functionMetaData, instanceId)); pkgDir.mkdirs(); - + pkgFile = new File( pkgDir, new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName()); downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); } - + InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails()); + instanceConfig.setFunctionDetails(functionDetails.build()); // TODO: set correct function id and version when features implemented instanceConfig.setFunctionId(UUID.randomUUID().toString()); instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); @@ -156,9 +170,9 @@ public class FunctionActioner implements AutoCloseable { } private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException { - + File pkgDir = pkgFile.getParentFile(); - + if (pkgFile.exists()) { log.warn("Function package exists already {} deleting it", pkgFile); @@ -178,7 +192,7 @@ public class FunctionActioner implements AutoCloseable { boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); log.info("Function package file {} will be downloaded from {}", tempPkgFile, downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); - + if(downloadFromHttp) { Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile)); } else { @@ -187,7 +201,7 @@ public class FunctionActioner implements AutoCloseable { new FileOutputStream(tempPkgFile), pkgLocationPath); } - + try { // create a hardlink, if there are two concurrent createLink operations, one will fail. // this ensures one instance will successfully download the package. @@ -243,4 +257,70 @@ public class FunctionActioner implements AutoCloseable { }, File.separatorChar); } + + public static boolean isFunctionCodeBuiltin(FunctionDetailsOrBuilder functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return true; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return true; + } + } + + return false; + } + + private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile(); + String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass(); + SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource()); + builder.setClassName(sourceClass); + functionDetails.setSource(builder); + + fillSourceSinkTypeClass(functionDetails, archive, sourceClass); + return archive; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile(); + String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass(); + SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink()); + builder.setClassName(sinkClass); + functionDetails.setSink(builder); + + fillSourceSinkTypeClass(functionDetails, archive, sinkClass); + return archive; + } + } + + throw new IOException("Could not find built in archive definition"); + } + + private void fillSourceSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className) + throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) { + String typeArg = getSourceType(className, ncl).getName(); + + SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource()); + sourceBuilder.setTypeClassName(typeArg); + functionDetails.setSource(sourceBuilder); + + SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink()); + sinkBuilder.setTypeClassName(typeArg); + functionDetails.setSink(sinkBuilder); + } + } + } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 5c6184f..08de636 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -78,13 +78,16 @@ public class FunctionRuntimeManager implements AutoCloseable{ private RuntimeFactory runtimeFactory; private MembershipManager membershipManager; + private final ConnectorsManager connectorsManager; public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Namespace dlogNamespace, - MembershipManager membershipManager) throws Exception { + MembershipManager membershipManager, + ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; + this.connectorsManager = connectorsManager; Reader<byte[]> reader = pulsarClient.newReader() .topic(this.workerConfig.getFunctionAssignmentTopic()) @@ -99,7 +102,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()) .useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()) .tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build(); - + if (workerConfig.getThreadContainerFactory() != null) { this.runtimeFactory = new ThreadRuntimeFactory( workerConfig.getThreadContainerFactory().getThreadGroupName(), @@ -121,7 +124,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.actionQueue = new LinkedBlockingQueue<>(); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, actionQueue); + dlogNamespace, actionQueue, connectorsManager); this.membershipManager = membershipManager; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0237700..4393331 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -102,12 +102,14 @@ public class WorkerService { this.functionMetaDataManager = new FunctionMetaDataManager( this.workerConfig, this.schedulerManager, this.client); + this.connectorsManager = new ConnectorsManager(workerConfig); + //create membership manager this.membershipManager = new MembershipManager(this.workerConfig, this.client); // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); + this.workerConfig, this.client, this.dlogNamespace, this.membershipManager, connectorsManager); // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); @@ -136,8 +138,6 @@ public class WorkerService { // indicate function worker service is done intializing this.isInitialized = true; - this.connectorsManager = new ConnectorsManager(workerConfig); - } catch (Throwable t) { log.error("Error Starting up in worker", t); throw new RuntimeException(t); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index a9de4d7..4ff66c6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -25,6 +25,8 @@ import static org.apache.pulsar.functions.utils.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; +import com.google.gson.Gson; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -51,6 +53,8 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; +import lombok.extern.slf4j.Slf4j; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; @@ -64,6 +68,8 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; @@ -77,9 +83,6 @@ import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import com.google.gson.Gson; - -import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; @Slf4j @@ -148,12 +151,17 @@ public class FunctionsImpl { FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); - PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); - functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); + boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); + if (isBuiltin) { + packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + } else { + packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl + : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + } - return isPkgUrlProvided ? updateRequest(functionMetaDataBuilder.build()) + functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } @@ -193,12 +201,18 @@ public class FunctionsImpl { FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); - PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); - functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); + + boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); + if (isBuiltin) { + packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + } else { + packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl + : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + } - return isPkgUrlProvided ? updateRequest(functionMetaDataBuilder.build()) + functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } @@ -659,12 +673,53 @@ public class FunctionsImpl { private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson) throws IllegalArgumentException { - if (uploadedInputStream == null || fileDetail == null) { + + FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + functionDetailsJson, null); + if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == null || fileDetail == null)) { throw new IllegalArgumentException("Function Package is not provided"); } - return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson, null); + + return functionDetails; + } + + private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return true; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return true; + } + } + + return false; } + private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return sourceSpec.getBuiltin(); + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return sinkSpec.getBuiltin(); + } + } + + return null; + } + + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, String functionDetailsJson, File jarWithFileUrl) throws IllegalArgumentException { if (tenant == null) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index a1bcd4a..5754477 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -47,7 +47,7 @@ public class FunctionActionerTest { /** * Validates FunctionActioner tries to download file from bk. - * + * * @throws Exception */ @Test @@ -68,7 +68,8 @@ public class FunctionActionerTest { LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue); + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + new ConnectorsManager(workerConfig)); Runtime runtime = mock(Runtime.class); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") @@ -110,7 +111,8 @@ public class FunctionActionerTest { LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue); + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + new ConnectorsManager(workerConfig)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 690f474..8ab7473 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -92,7 +92,8 @@ public class FunctionRuntimeManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( @@ -185,7 +186,8 @@ public class FunctionRuntimeManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( @@ -282,7 +284,8 @@ public class FunctionRuntimeManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 6dd3fa3..6bf1343 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -135,7 +135,8 @@ public class MembershipManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); @@ -199,7 +200,8 @@ public class MembershipManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); @@ -288,7 +290,8 @@ public class MembershipManagerTest { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));