This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3095d8  Classloader choice for validating Source/Sink (#3865)
f3095d8 is described below

commit f3095d8697ccbe62e4676f94e671047a82bebe40
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Mar 28 21:22:37 2019 -0700

    Classloader choice for validating Source/Sink (#3865)
    
    * Try both regular classloader as well as nar class loader for validating 
source/sinks
    
    * Fixed test
    
    * Fix unittest
    
    * Added more comments to the code
    
    * rename variables
    
    * Wait for the create to succeed before updating. Otherwise there might be 
some reamnant producers
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  8 +++
 .../pulsar/functions/utils/SinkConfigUtils.java    | 34 +++++++++--
 .../pulsar/functions/utils/SourceConfigUtils.java  | 32 ++++++++--
 .../functions/worker/rest/api/ComponentImpl.java   |  3 +
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  2 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  2 +-
 .../integration/functions/PulsarFunctionsTest.java | 68 ++++++++++++++++++++++
 7 files changed, 137 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 422e9ed..042a952 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -599,6 +599,14 @@ public class PulsarFunctionE2ETest {
         SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, functionName, sinkTopic);
         admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
 
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() 
== 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
         admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 42538a2..81554c4 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -46,6 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 
+@Slf4j
 public class SinkConfigUtils {
 
     @Getter
@@ -296,14 +298,37 @@ public class SinkConfigUtils {
         }
 
         String sinkClassName;
-        ClassLoader classLoader;
+        final Class<?> typeArg;
+        final ClassLoader classLoader;
         if (!isEmpty(sinkConfig.getClassName())) {
             sinkClassName = sinkConfig.getClassName();
+            // We really don't know if we should use nar class loader or 
regular classloader
+            ClassLoader jarClassLoader = null;
+            ClassLoader narClassLoader = null;
             try {
-                classLoader = Utils.extractClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
+                jarClassLoader = Utils.extractClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
-                throw new IllegalArgumentException("Invalid Sink Jar");
             }
+            try {
+                narClassLoader = Utils.extractNarClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+            }
+            if (jarClassLoader == null && narClassLoader == null) {
+                throw new IllegalArgumentException("Invalid Sink Package");
+            }
+            // We use typeArg and classLoader as arguments for lambda 
functions that require them to be final
+            // Thus we use these tmp vars
+            Class<?> tmptypeArg;
+            ClassLoader tmpclassLoader;
+            try {
+                tmptypeArg = getSinkType(sinkClassName, narClassLoader);
+                tmpclassLoader = narClassLoader;
+            } catch (Exception e) {
+                tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+                tmpclassLoader = jarClassLoader;
+            }
+            typeArg = tmptypeArg;
+            classLoader = tmpclassLoader;
         } else if 
(!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && 
sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
 {
             throw new IllegalArgumentException("Class-name must be present for 
archive with file-url");
         } else {
@@ -316,10 +341,9 @@ public class SinkConfigUtils {
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract sink 
class from archive", e1);
             }
+            typeArg = getSinkType(sinkClassName, classLoader);
         }
 
-        Class<?> typeArg = getSinkType(sinkClassName, classLoader);
-
         if (sinkConfig.getTopicToSerdeClassName() != null) {
             sinkConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
                 ValidatorUtils.validateSerde(serdeClassName, typeArg, 
classLoader, true);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index f721112..9a32085 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -209,14 +209,37 @@ public class SourceConfigUtils {
         }
 
         String sourceClassName;
-        ClassLoader classLoader;
+        final Class<?> typeArg;
+        final ClassLoader classLoader;
         if (!isEmpty(sourceConfig.getClassName())) {
             sourceClassName = sourceConfig.getClassName();
+            // We really don't know if we should use nar class loader or 
regular classloader
+            ClassLoader jarClassLoader = null;
+            ClassLoader narClassLoader = null;
             try {
-                classLoader = Utils.extractClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
+                jarClassLoader = Utils.extractClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
-                throw new IllegalArgumentException("Invalid Source Jar");
             }
+            try {
+                narClassLoader = Utils.extractNarClassLoader(archivePath, 
functionPkgUrl, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+            }
+            if (jarClassLoader == null && narClassLoader == null) {
+                throw new IllegalArgumentException("Invalid Source Package");
+            }
+            // We use typeArg and classLoader as arguments for lambda 
functions that require them to be final
+            // Thus we use these tmp vars
+            Class<?> tmptypeArg;
+            ClassLoader tmpclassLoader;
+            try {
+                tmptypeArg = getSourceType(sourceClassName, narClassLoader);
+                tmpclassLoader = narClassLoader;
+            } catch (Exception e) {
+                tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
+                tmpclassLoader = jarClassLoader;
+            }
+            typeArg = tmptypeArg;
+            classLoader = tmpclassLoader;
         } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && 
sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE))
 {
             throw new IllegalArgumentException("Class-name must be present for 
archive with file-url");
         } else {
@@ -229,10 +252,9 @@ public class SourceConfigUtils {
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract source 
class from archive", e1);
             }
+            typeArg = getSourceType(sourceClassName, classLoader);
         }
 
-        Class<?> typeArg = getSourceType(sourceClassName, classLoader);
-
         // Only one of serdeClassName or schemaType should be set
         if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && 
!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
             throw new IllegalArgumentException("Only one of serdeClassName or 
schemaType should be set");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0ed9fd1..fd75da3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -588,6 +588,9 @@ public abstract class ComponentImpl {
             } else if (uploadedInputStream != null) {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName, uploadedInputStreamAsFile,
                         fileDetail, functionDetailsJson, 
mergedComponentConfigJson, componentType);
+            } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://"))
 {
+                functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName, null,
+                        null, functionDetailsJson, mergedComponentConfigJson, 
componentType);
             } else {
                 functionDetails = 
validateUpdateRequestParamsWithExistingMetadata(
                         tenant, namespace, componentName, 
existingComponent.getPackageLocation(), mergedComponentConfigJson, 
componentType);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 192fd1a..b723411 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -361,7 +361,7 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Invalid Sink Jar")
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Invalid Sink Package")
     public void testRegisterSinkHttpUrl() {
         try {
             testRegisterSinkMissingArguments(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 25a0845..adaa606 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -325,7 +325,7 @@ public class SourceApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Invalid Source Jar")
+    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Invalid Source Package")
     public void testRegisterSourceHttpUrl() {
         try {
             testRegisterSourceMissingArguments(
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d8e5f50..a60eb47 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -172,6 +172,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // validate the sink result
         tester.validateSinkResult(kvs);
 
+        // update the sink
+        updateSinkConnector(tester, tenant, namespace, sinkName, 
inputTopicName);
+
         // delete the sink
         deleteSink(tenant, namespace, sinkName);
 
@@ -220,6 +223,45 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             result.getStdout());
     }
 
+    protected void updateSinkConnector(SinkTester tester,
+                                       String tenant,
+                                       String namespace,
+                                       String sinkName,
+                                       String inputTopicName) throws Exception 
{
+        String[] commands;
+        if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "update",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--sink-type", tester.sinkType().name().toLowerCase(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName,
+                    "--parallelism", "2"
+            };
+        } else {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "create",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--archive", tester.getSinkArchive(),
+                    "--classname", tester.getSinkClassName(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName,
+                    "--parallelism", "2"
+            };
+        }
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("\"Updated successfully\""),
+                result.getStdout());
+    }
+
     protected void getSinkInfoSuccess(SinkTester tester,
                                       String tenant,
                                       String namespace,
@@ -422,6 +464,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // validate the source result
         validateSourceResult(consumer, kvs);
 
+        // update the source connector
+        updateSourceConnector(tester, tenant, namespace, sourceName, 
outputTopicName);
+
         // delete the source
         deleteSource(tenant, namespace, sourceName);
 
@@ -455,6 +500,29 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             result.getStdout());
     }
 
+    protected void updateSourceConnector(SourceTester tester,
+                                         String tenant,
+                                         String namespace,
+                                         String sourceName,
+                                         String outputTopicName) throws 
Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "source", "update",
+                "--tenant", tenant,
+                "--namespace", namespace,
+                "--name", sourceName,
+                "--source-type", tester.sourceType(),
+                "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+                "--destinationTopicName", outputTopicName,
+                "--parallelism", "2"
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("\"Updated successfully\""),
+                result.getStdout());
+    }
+
     protected void getSourceInfoSuccess(SourceTester tester,
                                         String tenant,
                                         String namespace,

Reply via email to