This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 752f1b4  Derive source/sink arg-class name from function-class for 
file-url (#2258)
752f1b4 is described below

commit 752f1b44359bc83f0f11ddd2f0cb18f4f70b712c
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Jul 31 11:32:49 2018 -0700

    Derive source/sink arg-class name from function-class for file-url (#2258)
    
    * Derive source/sink arg-class name from functio-class for file-url archive
    
    * fix set type-arg if src/sink arg-class is not set
    
    * add unit-test
---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 99 ++++++++++++++++------
 .../org/apache/pulsar/functions/utils/Utils.java   | 15 ++--
 .../functions/worker/rest/api/FunctionsImpl.java   | 42 ++++++---
 3 files changed, 113 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 16f1a76..5398bc9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.io;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.io.File;
 import java.lang.reflect.Method;
@@ -62,7 +66,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -70,11 +73,9 @@ import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.WorkerServer;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -124,7 +125,7 @@ public class PulsarSinkE2ETest {
     public Object[][] validRoleName() {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
-    
+
     @BeforeMethod
     void setup(Method method) throws Exception {
 
@@ -147,7 +148,6 @@ public class PulsarSinkE2ETest {
         config.setBrokerServicePortTls(brokerServiceTlsPort);
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
 
-
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);
@@ -156,7 +156,6 @@ public class PulsarSinkE2ETest {
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
-    
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
@@ -190,12 +189,12 @@ public class PulsarSinkE2ETest {
                     workerConfig.getClientAuthenticationParameters());
         }
         pulsarClient = clientBuilder.build();
-       
+
         TenantInfo propAdmin = new TenantInfo();
         propAdmin.getAdminRoles().add("superUser");
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
-       
+
         Thread.sleep(100);
     }
 
@@ -237,7 +236,7 @@ public class PulsarSinkE2ETest {
         workerConfig.setUseTls(true);
         workerConfig.setTlsAllowInsecureConnection(true);
         workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
-        
+
         workerConfig.setAuthenticationEnabled(true);
         workerConfig.setAuthorizationEnabled(true);
 
@@ -285,7 +284,7 @@ public class PulsarSinkE2ETest {
             }
         }, 5, 150);
         // validate pulsar sink consumer has started on the topic
-        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
         int totalMsgs = 5;
         for (int i = 0; i < totalMsgs; i++) {
@@ -303,17 +302,15 @@ public class PulsarSinkE2ETest {
 
         Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
         String receivedPropertyValue = msg.getProperty(propertyKey);
-        Assert.assertEquals(propertyValue, receivedPropertyValue);
+        assertEquals(propertyValue, receivedPropertyValue);
 
         // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
         // due to publish failure
-        Assert.assertNotEquals(
-                
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+        
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
                 totalMsgs);
 
     }
 
-    
     @Test(timeOut = 20000)
     public void testPulsarSinkStats() throws Exception {
 
@@ -349,7 +346,7 @@ public class PulsarSinkE2ETest {
             }
         }, 5, 150);
         // validate pulsar sink consumer has started on the topic
-        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
@@ -371,7 +368,7 @@ public class PulsarSinkE2ETest {
                 functionName);
 
         int numInstances = functionStats.getFunctionStatusListCount();
-        Assert.assertEquals(numInstances, 1);
+        assertEquals(numInstances, 1);
 
         FunctionStatus stats = 
functionStats.getFunctionStatusListList().get(0);
         Map<String, DataDigest> metricsData = 
stats.getMetrics().getMetricsMap();
@@ -379,12 +376,13 @@ public class PulsarSinkE2ETest {
         double count = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
         double success = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
         String ownerWorkerId = stats.getWorkerId();
-        Assert.assertEquals((int) count, totalMsgs);
-        Assert.assertEquals((int) success, totalMsgs);
-        Assert.assertEquals(ownerWorkerId, workerId);
+        assertEquals((int) count, totalMsgs);
+        assertEquals((int) success, totalMsgs);
+        assertEquals(ownerWorkerId, workerId);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName,
+            String sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -407,7 +405,7 @@ public class PulsarSinkE2ETest {
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
         
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
-        sourceSpecBuilder.setTypeClassName(byte[].class.getName());
+        sourceSpecBuilder.setTypeClassName(typeArg.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
         sourceSpecBuilder.setSubscriptionName(subscriptionName);
         sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, 
DefaultSerDe.class.getName());
@@ -425,7 +423,7 @@ public class PulsarSinkE2ETest {
 
         return functionDetailsBuilder.build();
     }
-    
+
     @Test(dataProvider = "validRoleName")
     public void testAuthorization(boolean validRoleName) throws Exception {
 
@@ -450,9 +448,62 @@ public class PulsarSinkE2ETest {
                 sinkTopic, subscriptionName);
         try {
             admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
-            Assert.assertTrue(validRoleName);
+            assertTrue(validRoleName);
         } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) 
{
-            Assert.assertFalse(validRoleName);
+            assertFalse(validRoleName);
         }
     }
+
+    /**
+     * Test to verify: function-server loads jar using file-url and derives 
type-args classes if not provided
+     * @throws Exception
+     */
+    @Test(timeOut = 20000)
+    public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
IdentityFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+        functionDetailsBuilder.setTenant(tenant);
+        functionDetailsBuilder.setNamespace(namespacePortion);
+        functionDetailsBuilder.setName(functionName);
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(1);
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
+        Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(new 
IdentityFunction(), false);
+
+        // set source spec
+        // source spec classname should be empty so that the default pulsar 
source will be used
+        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, 
DefaultSerDe.class.getName());
+        functionDetailsBuilder.setAutoAck(true);
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        sinkSpecBuilder.setTopic(sinkTopic);
+        Map<String, Object> sinkConfigMap = Maps.newHashMap();
+        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        FunctionDetails functionDetails = functionDetailsBuilder.build();
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        FunctionDetails functionMetadata = 
admin.functions().getFunction(tenant, namespacePortion, functionName);
+
+        assertEquals(functionMetadata.getSource().getTypeClassName(), 
typeArgs[0].getName());
+        assertEquals(functionMetadata.getSink().getTypeClassName(), 
typeArgs[1].getName());
+
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 0c25be2..94c315d 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -102,16 +102,21 @@ public class Utils {
     }
 
     public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
-
-        Object userClass = createInstance(functionConfig.getClassName(), 
Thread.currentThread().getContextClassLoader());
+        Object userClass = createInstance(functionConfig.getClassName(),
+                Thread.currentThread().getContextClassLoader());
+        boolean isWindowConfigPresent = functionConfig.getWindowConfig() != 
null;
+        return getFunctionTypes(userClass, isWindowConfigPresent);
+    }
+    
+    public static Class<?>[] getFunctionTypes(Object userClass, boolean 
isWindowConfigPresent) {
 
         Class<?>[] typeArgs;
         // if window function
-        if (functionConfig.getWindowConfig() != null) {
+        if (isWindowConfigPresent) {
             java.util.function.Function function = 
(java.util.function.Function) userClass;
             if (function == null) {
-                throw new IllegalArgumentException(String.format("The Java 
util function class %s could not be instantiated",
-                        functionConfig.getClassName()));
+                throw new IllegalArgumentException(
+                        String.format("The Java util function class %s could 
not be instantiated", userClass));
             }
             typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
             if (!typeArgs[0].equals(Collection.class)) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b935bf5..9e3944b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -56,7 +56,10 @@ import javax.ws.rs.core.StreamingOutput;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.join;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -139,7 +142,7 @@ public class FunctionsImpl {
         }
 
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
         try {
             if (isPkgUrlProvided) {
@@ -203,7 +206,7 @@ public class FunctionsImpl {
         }
         
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
         try {
             if (isPkgUrlProvided) {
@@ -738,14 +741,14 @@ public class FunctionsImpl {
     private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
-            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            if (!isEmpty(sourceSpec.getBuiltin())) {
                 return true;
             }
         }
 
         if (functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
-            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+            if (!isEmpty(sinkSpec.getBuiltin())) {
                 return true;
             }
         }
@@ -756,14 +759,14 @@ public class FunctionsImpl {
     private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
-            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            if (!isEmpty(sourceSpec.getBuiltin())) {
                 return sourceSpec.getBuiltin();
             }
         }
 
         if (functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
-            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+            if (!isEmpty(sinkSpec.getBuiltin())) {
                 return sinkSpec.getBuiltin();
             }
         }
@@ -815,7 +818,7 @@ public class FunctionsImpl {
                 missingFields.add("Sink");
             }
             if (!missingFields.isEmpty()) {
-                String errorMessage = StringUtils.join(missingFields, ",");
+                String errorMessage = join(missingFields, ",");
                 throw new IllegalArgumentException(errorMessage + " is not 
provided");
             }
             if (functionDetails.getParallelism() <= 0) {
@@ -837,7 +840,7 @@ public class FunctionsImpl {
             return;
         }
 
-        if (StringUtils.isBlank(functionDetailsBuilder.getClassName())) {
+        if (isBlank(functionDetailsBuilder.getClassName())) {
             throw new IllegalArgumentException("function class-name can't be 
empty");
         }
 
@@ -847,13 +850,15 @@ public class FunctionsImpl {
 
         // validate function class-type
         Object functionObject = 
createInstance(functionDetailsBuilder.getClassName(), classLoader);
+        Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
+        
         if (!(functionObject instanceof 
org.apache.pulsar.functions.api.Function)
                 && !(functionObject instanceof java.util.function.Function)) {
             throw new RuntimeException("User class must either be Function or 
java.util.Function");
         }
-
+        
         if (functionDetailsBuilder.hasSource() && 
functionDetailsBuilder.getSource() != null
-                && 
StringUtils.isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
+                && 
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
             try {
                 String sourceClassName = 
functionDetailsBuilder.getSource().getClassName();
                 String argClassName = getTypeArg(sourceClassName, 
Source.class, classLoader).getName();
@@ -862,7 +867,7 @@ public class FunctionsImpl {
 
                 // if sink-class not present then set same arg as source
                 if (!functionDetailsBuilder.hasSink()
-                        || 
StringUtils.isBlank(functionDetailsBuilder.getSink().getClassName())) {
+                        || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
                     functionDetailsBuilder
                             
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                 }
@@ -873,10 +878,14 @@ public class FunctionsImpl {
                 log.error("Failed to validate source class", e);
                 throw new IllegalArgumentException("Failed to validate source 
class-name", e);
             }
+        } else if 
(isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
+            // if function-src-class is not present then set function-src 
type-class according to function class
+            functionDetailsBuilder
+                    
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
         }
 
         if (functionDetailsBuilder.hasSink() && 
functionDetailsBuilder.getSink() != null
-                && 
StringUtils.isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
+                && 
isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
             try {
                 String sinkClassName = 
functionDetailsBuilder.getSink().getClassName();
                 String argClassName = getTypeArg(sinkClassName, Sink.class, 
classLoader).getName();
@@ -884,7 +893,7 @@ public class FunctionsImpl {
 
                 // if source-class not present then set same arg as sink
                 if (!functionDetailsBuilder.hasSource()
-                        || 
StringUtils.isBlank(functionDetailsBuilder.getSource().getClassName())) {
+                        || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
                     functionDetailsBuilder
                             
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                 }
@@ -895,7 +904,12 @@ public class FunctionsImpl {
                 log.error("Failed to validate sink class", e);
                 throw new IllegalArgumentException("Failed to validate sink 
class-name", e);
             }
+        } else 
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+            // if function-sink-class is not present then set function-sink 
type-class according to function class
+            functionDetailsBuilder
+                    
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
         }
+
     }
 
     private Class<?> getTypeArg(String className, Class<?> funClass, 
URLClassLoader classLoader)

Reply via email to