This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2147201  Submit and run locally builtin connectors (#2114)
2147201 is described below

commit 21472010db047398612ba94824ac56434f1ac38a
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jul 11 17:12:14 2018 -0700

    Submit and run locally builtin connectors (#2114)
    
    * Submit and run locally builtin connectors
    
    * Fixed test class constructor
    
    * Fixed null check
    
    * Fixed function package update
    
    * Fixed sourceClassName when submiting custom archive
---
 bin/pulsar                                         |   2 +-
 bin/pulsar-admin                                   |   8 +-
 .../org/apache/pulsar/client/admin/Functions.java  |  19 +++
 .../client/admin/internal/FunctionsImpl.java       |  25 +++-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 101 ++++++++++++++--
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  99 ++++++++++++++--
 .../proto/src/main/proto/Function.proto            |   8 ++
 .../org/apache/pulsar/functions/utils/Utils.java   |   5 +-
 .../functions/utils/validation/ValidatorImpls.java |  12 +-
 .../pulsar/functions/worker/FunctionActioner.java  | 128 +++++++++++++++++----
 .../functions/worker/FunctionRuntimeManager.java   |   9 +-
 .../pulsar/functions/worker/WorkerService.java     |   6 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |  85 +++++++++++---
 .../functions/worker/FunctionActionerTest.java     |   8 +-
 .../worker/FunctionRuntimeManagerTest.java         |   9 +-
 .../functions/worker/MembershipManagerTest.java    |   9 +-
 16 files changed, 448 insertions(+), 85 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index d8f6e21..c28bfe0 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -19,7 +19,7 @@
 #
 
 BINDIR=$(dirname "$0")
-PULSAR_HOME=`cd $BINDIR/..;pwd`
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
 
 DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
 DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index c984617..1a1339d 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,7 @@
 #
 
 BINDIR=$(dirname "$0")
-PULSAR_HOME=`cd $BINDIR/..;pwd`
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
 
 DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
 DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
@@ -55,7 +55,7 @@ fi
 
 # exclude tests jar
 BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> 
/dev/null | grep -v tests | tail -1`
-if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then 
+if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
     echo "\nCouldn't find pulsar jar.";
     echo "Make sure you've run 'mvn package'\n";
     exit 1;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index d34f419..9f7339c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.List;
+import java.util.Set;
 
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -232,4 +233,22 @@ public interface Functions {
      *
      */
     List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sources currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    Set<String> getSources() throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sinks currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    Set<String> getSinks() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 5cb9284..a2008cf 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.StandardCopyOption;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
@@ -37,6 +39,7 @@ import javax.ws.rs.core.Response;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
@@ -110,7 +113,10 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
-            mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
 
             mp.bodyPart(new FormDataBodyPart("functionDetails",
                 printJson(functionDetails),
@@ -153,9 +159,12 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
     public void updateFunction(FunctionDetails functionDetails, String 
fileName) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
-            if (fileName != null) {
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
                 mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
             }
+
             mp.bodyPart(new FormDataBodyPart("functionDetails",
                 printJson(functionDetails),
                 MediaType.APPLICATION_JSON_TYPE));
@@ -251,6 +260,18 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
         }
     }
 
+    @Override
+    public Set<String> getSources() throws PulsarAdminException {
+        return getConnectorsList().stream().filter(c -> 
!StringUtils.isEmpty(c.getSourceClass()))
+                .map(ConnectorDefinition::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<String> getSinks() throws PulsarAdminException {
+        return getConnectorsList().stream().filter(c -> 
!StringUtils.isEmpty(c.getSinkClass()))
+                .map(ConnectorDefinition::getName).collect(Collectors.toSet());
+    }
+
     public static void mergeJson(String json, Builder builder) throws 
IOException {
         JsonFormat.parser().merge(json, builder);
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 2c91588..c27b40a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -24,12 +24,26 @@ import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -44,6 +58,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 import java.io.File;
@@ -139,6 +154,25 @@ public class CmdSinks extends CmdBase {
                             
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                     sinkConfig.getArchive(), admin);
         }
+
+        @Override
+        protected String validateSinkType(String sinkType) throws IOException {
+            // Validate the connector sink type from the locally available 
connectors
+            String pulsarHome = System.getenv("PULSAR_HOME");
+            if (pulsarHome == null) {
+                pulsarHome = Paths.get("").toAbsolutePath().toString();
+            }
+            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
+            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
+
+            if (!connectors.getSinks().containsKey(sinkType)) {
+                throw new ParameterException("Invalid sink type '" + sinkType 
+ "' -- Available sinks are: "
+                        + connectors.getSinks().keySet());
+            }
+
+            // Sink type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSinks().get(sinkType).toString();
+        }
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run 
in a Pulsar cluster")
@@ -174,6 +208,10 @@ public class CmdSinks extends CmdBase {
         protected String namespace;
         @Parameter(names = "--name", description = "The sink's name")
         protected String name;
+
+        @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's 
connector provider")
+        protected String sinkType;
+
         @Parameter(names = "--inputs", description = "The sink's input topic 
or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
@@ -246,10 +284,18 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setParallelism(parallelism);
             }
 
+            if (archive != null && sinkType != null) {
+                throw new ParameterException("Cannot specify both archive and 
sink-type");
+            }
+
             if (null != archive) {
                 sinkConfig.setArchive(archive);
             }
 
+            if (sinkType != null) {
+                sinkConfig.setArchive(validateSinkType(sinkType));
+            }
+
             org.apache.pulsar.functions.utils.Resources resources = 
sinkConfig.getResources();
             if (resources == null) {
                 resources = new org.apache.pulsar.functions.utils.Resources();
@@ -307,6 +353,7 @@ public class CmdSinks extends CmdBase {
                 throw new ParameterException("Sink archive not specfied");
             }
 
+            boolean isConnectorBuiltin = 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
             boolean isArchivePathUrl = 
Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive());
 
             String archivePath = null;
@@ -326,6 +373,9 @@ public class CmdSinks extends CmdBase {
                                 + ", due to =" + e.getMessage());
                     }
                 }
+            } else if (isConnectorBuiltin) {
+                // Ignore local checks when submitting built-in connector
+                archivePath = null;
             } else {
                 archivePath = sinkConfig.getArchive();
             }
@@ -339,6 +389,7 @@ public class CmdSinks extends CmdBase {
                 try {
                     ConnectorDefinition connector = 
ConnectorUtils.getConnectorDefinition(archivePath);
                     log.info("Connector: {}", connector);
+
                     // Validate sink class
                     ConnectorUtils.getIOSinkClass(archivePath);
                 } catch (IOException e) {
@@ -368,16 +419,23 @@ public class CmdSinks extends CmdBase {
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
 
-            String sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
+            String sinkClassName = null;
+            String typeArg = null;
+
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+
+            boolean isBuiltin = 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
 
-            String typeArg;
-            try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
-                    Collections.emptySet())) {
-                typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null
-                        : Utils.getSinkType(sinkClassName, ncl).getName();
+            if (!isBuiltin) {
+                sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
+
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? 
null
+                            : Utils.getSinkType(sinkClassName, ncl).getName();
+                }
             }
 
-            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
             if (sinkConfig.getTenant() != null) {
                 functionDetailsBuilder.setTenant(sinkConfig.getTenant());
             }
@@ -414,7 +472,15 @@ public class CmdSinks extends CmdBase {
 
             // set up sink spec
             SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(sinkClassName);
+            if (sinkClassName != null) {
+                sinkSpecBuilder.setClassName(sinkClassName);
+            }
+
+            if (isBuiltin) {
+                String builtin = 
sinkConfig.getArchive().replaceFirst("^builtin://", "");
+                sinkSpecBuilder.setBuiltin(builtin);
+            }
+
             if (sinkConfig.getConfigs() != null) {
                 sinkSpecBuilder.setConfigs(new 
Gson().toJson(sinkConfig.getConfigs()));
             }
@@ -438,6 +504,23 @@ public class CmdSinks extends CmdBase {
             }
             return functionDetailsBuilder.build();
         }
+
+        protected String validateSinkType(String sinkType) throws IOException {
+            Set<String> availableSinks;
+            try {
+                availableSinks = admin.functions().getSinks();
+            } catch (PulsarAdminException e) {
+                throw new IOException(e);
+            }
+
+            if (!availableSinks.contains(sinkType)) {
+                throw new ParameterException(
+                        "Invalid sink type '" + sinkType + "' -- Available 
sinks are: " + availableSinks);
+            }
+
+            // Source type is a valid built-in connector type
+            return "builtin://" + sinkType;
+        }
     }
 
     @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
@@ -475,7 +558,7 @@ public class CmdSinks extends CmdBase {
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sinks supported by Pulsar cluster")
-    public class ListSinks extends SinkCommand {
+    public class ListSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
             admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSinkClass()))
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 53609d0..7c147e5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -24,12 +24,23 @@ import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -43,6 +54,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 import java.io.File;
@@ -135,6 +147,25 @@ public class CmdSources extends CmdBase {
                             
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                     sourceConfig.getArchive(), admin);
         }
+
+        @Override
+        protected String validateSourceType(String sourceType) throws 
IOException {
+            // Validate the connector source type from the locally available 
connectors
+            String pulsarHome = System.getenv("PULSAR_HOME");
+            if (pulsarHome == null) {
+                pulsarHome = Paths.get("").toAbsolutePath().toString();
+            }
+            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
+            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
+
+            if (!connectors.getSources().containsKey(sourceType)) {
+                throw new ParameterException("Invalid source type '" + 
sourceType + "' -- Available sources are: "
+                        + connectors.getSources().keySet());
+            }
+
+            // Source type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSources().get(sourceType).toString();
+        }
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO source connector to 
run in a Pulsar cluster")
@@ -170,10 +201,12 @@ public class CmdSources extends CmdBase {
         protected String namespace;
         @Parameter(names = "--name", description = "The source's name")
         protected String name;
+
+        @Parameter(names = { "-t", "--source-type" }, description = "The 
source's connector provider")
+        protected String sourceType;
+
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the Source")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--className", description = "The source's class 
name")
-        protected String className;
         @Parameter(names = "--destinationTopicName", description = "The Pulsar 
topic to which data is sent")
         protected String destinationTopicName;
         @Parameter(names = "--deserializationClassName", description = "The 
SerDe classname for the source")
@@ -228,10 +261,18 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setParallelism(parallelism);
             }
 
+            if (archive != null && sourceType != null) {
+                throw new ParameterException("Cannot specify both archive and 
source-type");
+            }
+
             if (archive != null) {
                 sourceConfig.setArchive(archive);
             }
 
+            if (sourceType != null) {
+                sourceConfig.setArchive(validateSourceType(sourceType));
+            }
+
             org.apache.pulsar.functions.utils.Resources resources = 
sourceConfig.getResources();
             if (resources == null) {
                 resources = new org.apache.pulsar.functions.utils.Resources();
@@ -276,6 +317,7 @@ public class CmdSources extends CmdBase {
                 throw new ParameterException("Source archive not specfied");
             }
 
+            boolean isConnectorBuiltin = 
sourceConfig.getArchive().startsWith(Utils.BUILTIN);
             boolean isArchivePathUrl = 
Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive());
 
             String archivePath = null;
@@ -295,6 +337,9 @@ public class CmdSources extends CmdBase {
                                 + ", due to =" + e.getMessage());
                     }
                 }
+            } else if (isConnectorBuiltin) {
+                // Ignore local checks when submitting built-in connector
+                archivePath = null;
             } else {
                 archivePath = sourceConfig.getArchive();
             }
@@ -338,16 +383,23 @@ public class CmdSources extends CmdBase {
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
 
-            String sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+            String sourceClassName = null;
+            String typeArg = null;
+
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+
+            boolean isBuiltin = 
sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+
+            if (!isBuiltin) {
+                sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
 
-            String typeArg;
-            try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sourceConfig.getArchive()),
-                    Collections.emptySet())) {
-                typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? 
null
-                        : getSourceType(sourceClassName, ncl).getName();
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sourceConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) 
? null
+                            : getSourceType(sourceClassName, ncl).getName();
+                }
             }
 
-            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
             if (sourceConfig.getTenant() != null) {
                 functionDetailsBuilder.setTenant(sourceConfig.getTenant());
             }
@@ -368,7 +420,14 @@ public class CmdSources extends CmdBase {
 
             // set source spec
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(sourceClassName);
+            if (sourceClassName != null) {
+                sourceSpecBuilder.setClassName(sourceClassName);
+            }
+
+            if (isBuiltin) {
+                String builtin = 
sourceConfig.getArchive().replaceFirst("^builtin://", "");
+                sourceSpecBuilder.setBuiltin(builtin);
+            }
 
             if (sourceConfig.getConfigs() != null) {
                 sourceSpecBuilder.setConfigs(new 
Gson().toJson(sourceConfig.getConfigs()));
@@ -409,6 +468,23 @@ public class CmdSources extends CmdBase {
 
             return functionDetailsBuilder.build();
         }
+
+        protected String validateSourceType(String sourceType) throws 
IOException {
+            Set<String> availableSources;
+            try {
+                availableSources = admin.functions().getSources();
+            } catch (PulsarAdminException e) {
+                throw new IOException(e);
+            }
+
+            if (!availableSources.contains(sourceType)) {
+                throw new ParameterException(
+                        "Invalid source type '" + sourceType + "' -- Available 
sources are: " + availableSources);
+            }
+
+            // Source type is a valid built-in connector type
+            return "builtin://" + sourceType;
+        }
     }
 
     @Parameters(commandDescription = "Stops a Pulsar IO source connector")
@@ -446,7 +522,7 @@ public class CmdSources extends CmdBase {
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sources supported by Pulsar cluster")
-    public class ListSources extends SourceCommand {
+    public class ListSources extends BaseCommand {
         @Override
         void runCmd() throws Exception {
             admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
@@ -457,5 +533,4 @@ public class CmdSources extends CmdBase {
                     });
         }
     }
-
 }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 0f8b649..01b3660 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -71,6 +71,10 @@ message SourceSpec {
     map<string,string> topicsToSerDeClassName = 4;
     uint64 timeoutMs = 6;
     string topicsPattern = 7;
+
+    /* If specified, this will refer to an archive that is
+     * already present in the server */
+    string builtin = 8;
 }
 
 message SinkSpec {
@@ -82,6 +86,10 @@ message SinkSpec {
     // configs used only when functions output to sink
     string topic = 3;
     string serDeClassName = 4;
+
+    /* If specified, this will refer to an archive that is
+     * already present in the server */
+    string builtin = 6;
 }
 
 message PackageLocationMetaData {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 8b5e9fb..0c25be2 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -55,6 +55,7 @@ public class Utils {
 
     public static String HTTP = "http";
     public static String FILE = "file";
+    public static String BUILTIN = "builtin";
 
     public static final long getSequenceId(MessageId messageId) {
         MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof 
TopicMessageIdImpl)
@@ -222,7 +223,7 @@ public class Utils {
     }
 
     public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) 
{
-        return isNotBlank(functionPkgUrl)
-                && (functionPkgUrl.startsWith(Utils.HTTP) || 
functionPkgUrl.startsWith(Utils.FILE));
+        return isNotBlank(functionPkgUrl) && 
(functionPkgUrl.startsWith(Utils.HTTP)
+                || functionPkgUrl.startsWith(Utils.FILE));
     }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 7ff6b97..297a2f2 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -643,6 +643,11 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o) {
             SourceConfig sourceConfig = (SourceConfig) o;
+            if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
+                // We don't have to check the archive, since it's provided on 
the worker itself
+                return;
+            }
+
             String sourceClassName;
             try {
                 sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
@@ -714,6 +719,11 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o) {
             SinkConfig sinkConfig = (SinkConfig) o;
+            if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
+                // We don't have to check the archive, since it's provided on 
the worker itself
+                return;
+            }
+
             // if function-pkg url is present eg: file://xyz.jar then 
admin-tool might not have access of the file at
             // the same location so, need to rely on server side validation.
             if (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive())) {
@@ -805,7 +815,7 @@ public class ValidatorImpls {
 
             if(!Utils.isFunctionPackageUrlSupported(path)) {
                 // check file existence if path is not url and local path
-                if (!fileExists(path)) {
+                if (!path.startsWith(Utils.BUILTIN) && !fileExists(path)) {
                     throw new IllegalArgumentException
                             (String.format("File %s specified in field '%s' 
does not exist", path, name));
                 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ef519f4..4741c40 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -20,8 +20,13 @@ package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
+import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -30,29 +35,32 @@ import java.net.URL;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.MoreFiles;
-import com.google.common.io.RecursiveDeleteOption;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 @Data
 @Setter
@@ -68,15 +76,18 @@ public class FunctionActioner implements AutoCloseable {
     private LinkedBlockingQueue<FunctionAction> actionQueue;
     private volatile boolean running;
     private Thread actioner;
+    private final ConnectorsManager connectorsManager;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue) {
+                            LinkedBlockingQueue<FunctionAction> actionQueue,
+                            ConnectorsManager connectorsManager) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
         this.actionQueue = actionQueue;
+        this.connectorsManager = connectorsManager;
         actioner = new Thread(() -> {
             log.info("Starting Actioner Thread...");
             while(running) {
@@ -118,30 +129,33 @@ public class FunctionActioner implements AutoCloseable {
     protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) 
throws Exception {
         FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
         int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-        log.info("Starting function {} - {} ...",
-                functionMetaData.getFunctionDetails().getName(), instanceId);
+
+        FunctionDetails.Builder functionDetails = 
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
+        log.info("Starting function {} - {} ...", functionDetails.getName(), 
instanceId);
         File pkgFile = null;
-        
+
         String pkgLocation = 
functionMetaData.getPackageLocation().getPackagePath();
         boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
-        
-        if(isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
+
+        if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
             URL url = new URL(pkgLocation);
             pkgFile = new File(url.toURI());
+        } else if (isFunctionCodeBuiltin(functionDetails)) {
+            pkgFile = getBuiltinArchive(functionDetails);
         } else {
             File pkgDir = new File(
                     workerConfig.getDownloadDirectory(),
                     getDownloadPackagePath(functionMetaData, instanceId));
             pkgDir.mkdirs();
-            
+
             pkgFile = new File(
                     pkgDir,
                     new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName());
             downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
         }
-        
+
         InstanceConfig instanceConfig = new InstanceConfig();
-        
instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails());
+        instanceConfig.setFunctionDetails(functionDetails.build());
         // TODO: set correct function id and version when features implemented
         instanceConfig.setFunctionId(UUID.randomUUID().toString());
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
@@ -156,9 +170,9 @@ public class FunctionActioner implements AutoCloseable {
     }
 
     private void downloadFile(File pkgFile, boolean isPkgUrlProvided, 
FunctionMetaData functionMetaData, int instanceId) throws 
FileNotFoundException, IOException {
-        
+
         File pkgDir = pkgFile.getParentFile();
-        
+
         if (pkgFile.exists()) {
             log.warn("Function package exists already {} deleting it",
                     pkgFile);
@@ -178,7 +192,7 @@ public class FunctionActioner implements AutoCloseable {
         boolean downloadFromHttp = isPkgUrlProvided && 
pkgLocationPath.startsWith(HTTP);
         log.info("Function package file {} will be downloaded from {}", 
tempPkgFile,
                 downloadFromHttp ? pkgLocationPath : 
functionMetaData.getPackageLocation());
-        
+
         if(downloadFromHttp) {
             Utils.downloadFromHttpUrl(pkgLocationPath, new 
FileOutputStream(tempPkgFile));
         } else {
@@ -187,7 +201,7 @@ public class FunctionActioner implements AutoCloseable {
                     new FileOutputStream(tempPkgFile),
                     pkgLocationPath);
         }
-        
+
         try {
             // create a hardlink, if there are two concurrent createLink 
operations, one will fail.
             // this ensures one instance will successfully download the 
package.
@@ -243,4 +257,70 @@ public class FunctionActioner implements AutoCloseable {
                 },
                 File.separatorChar);
     }
+
+    public static boolean isFunctionCodeBuiltin(FunctionDetailsOrBuilder 
functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) 
throws IOException {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                File archive = 
connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
+                String sourceClass = 
ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
+                SourceSpec.Builder builder = 
SourceSpec.newBuilder(functionDetails.getSource());
+                builder.setClassName(sourceClass);
+                functionDetails.setSource(builder);
+
+                fillSourceSinkTypeClass(functionDetails, archive, sourceClass);
+                return archive;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                File archive = 
connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
+                String sinkClass = 
ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass();
+                SinkSpec.Builder builder = 
SinkSpec.newBuilder(functionDetails.getSink());
+                builder.setClassName(sinkClass);
+                functionDetails.setSink(builder);
+
+                fillSourceSinkTypeClass(functionDetails, archive, sinkClass);
+                return archive;
+            }
+        }
+
+        throw new IOException("Could not find built in archive definition");
+    }
+
+    private void fillSourceSinkTypeClass(FunctionDetails.Builder 
functionDetails, File archive, String className)
+            throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, 
Collections.emptySet())) {
+            String typeArg = getSourceType(className, ncl).getName();
+
+            SourceSpec.Builder sourceBuilder = 
SourceSpec.newBuilder(functionDetails.getSource());
+            sourceBuilder.setTypeClassName(typeArg);
+            functionDetails.setSource(sourceBuilder);
+
+            SinkSpec.Builder sinkBuilder = 
SinkSpec.newBuilder(functionDetails.getSink());
+            sinkBuilder.setTypeClassName(typeArg);
+            functionDetails.setSink(sinkBuilder);
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5c6184f..08de636 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -78,13 +78,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
+    private final ConnectorsManager connectorsManager;
 
 
     public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   PulsarClient pulsarClient,
                                   Namespace dlogNamespace,
-                                  MembershipManager membershipManager) throws 
Exception {
+                                  MembershipManager membershipManager,
+                                  ConnectorsManager connectorsManager) throws 
Exception {
         this.workerConfig = workerConfig;
+        this.connectorsManager = connectorsManager;
 
         Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
@@ -99,7 +102,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
                 .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
                 
.useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
                 
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
-        
+
         if (workerConfig.getThreadContainerFactory() != null) {
             this.runtimeFactory = new ThreadRuntimeFactory(
                     
workerConfig.getThreadContainerFactory().getThreadGroupName(),
@@ -121,7 +124,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, 
runtimeFactory,
-                dlogNamespace, actionQueue);
+                dlogNamespace, actionQueue, connectorsManager);
 
         this.membershipManager = membershipManager;
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0237700..4393331 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -102,12 +102,14 @@ public class WorkerService {
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client);
 
+            this.connectorsManager = new ConnectorsManager(workerConfig);
+
             //create membership manager
             this.membershipManager = new MembershipManager(this.workerConfig, 
this.client);
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig, this.client, this.dlogNamespace, 
this.membershipManager);
+                    this.workerConfig, this.client, this.dlogNamespace, 
this.membershipManager, connectorsManager);
 
             // Setting references to managers in scheduler
             
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -136,8 +138,6 @@ public class WorkerService {
             // indicate function worker service is done intializing
             this.isInitialized = true;
 
-            this.connectorsManager = new ConnectorsManager(workerConfig);
-
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index a9de4d7..4ff66c6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,6 +25,8 @@ import static org.apache.pulsar.functions.utils.Utils.HTTP;
 import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
 import static 
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create;
 
+import com.google.gson.Gson;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -51,6 +53,8 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Message;
@@ -64,6 +68,8 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
@@ -77,9 +83,6 @@ import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import com.google.gson.Gson;
-
-import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 
 @Slf4j
@@ -148,12 +151,17 @@ public class FunctionsImpl {
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder()
-                .setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                        : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
-        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
+        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
+        if (isBuiltin) {
+            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+        } else {
+            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
+                    : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
+        }
 
-        return isPkgUrlProvided ? 
updateRequest(functionMetaDataBuilder.build())
+        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
                 : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStream);
     }
 
@@ -193,12 +201,18 @@ public class FunctionsImpl {
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder()
-                .setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                        : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
-        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
+
+        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
+        if (isBuiltin) {
+            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+        } else {
+            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
+                    : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
+        }
 
-        return isPkgUrlProvided ? 
updateRequest(functionMetaDataBuilder.build())
+        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
                 : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStream);
     }
 
@@ -659,12 +673,53 @@ public class FunctionsImpl {
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             InputStream uploadedInputStream, FormDataContentDisposition 
fileDetail, String functionDetailsJson)
             throws IllegalArgumentException {
-        if (uploadedInputStream == null || fileDetail == null) {
+
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                functionDetailsJson, null);
+        if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == 
null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not 
provided");
         }
-        return validateUpdateRequestParams(tenant, namespace, functionName, 
functionDetailsJson, null);
+
+        return functionDetails;
+    }
+
+    private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
+    private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return sourceSpec.getBuiltin();
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return sinkSpec.getBuiltin();
+            }
+        }
+
+        return null;
+    }
+
+
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             String functionDetailsJson, File jarWithFileUrl) throws 
IllegalArgumentException {
         if (tenant == null) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index a1bcd4a..5754477 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -47,7 +47,7 @@ public class FunctionActionerTest {
 
     /**
      * Validates FunctionActioner tries to download file from bk.
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -68,7 +68,8 @@ public class FunctionActionerTest {
         LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue);
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+                new ConnectorsManager(workerConfig));
         Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -110,7 +111,8 @@ public class FunctionActionerTest {
         LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue);
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+                new ConnectorsManager(workerConfig));
 
         // (1) test with file url. functionActioner should be able to consider 
file-url and it should be able to call
         // RuntimeSpawner
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 690f474..8ab7473 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -92,7 +92,8 @@ public class FunctionRuntimeManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -185,7 +186,8 @@ public class FunctionRuntimeManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -282,7 +284,8 @@ public class FunctionRuntimeManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 6dd3fa3..6bf1343 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -135,7 +135,8 @@ public class MembershipManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
@@ -199,7 +200,8 @@ public class MembershipManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
@@ -288,7 +290,8 @@ public class MembershipManagerTest {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));

Reply via email to