This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35cd182  Added ability to take python zip files that contain all their 
dependencies (#2915)
35cd182 is described below

commit 35cd182f3fab0663bead931cd0ff6a0d7ce72b33
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Nov 2 16:17:18 2018 -0700

    Added ability to take python zip files that contain all their dependencies 
(#2915)
    
    * Added ability to take python zip files that contain all their dependencies
    
    * Exit on failure
---
 .../src/main/python/python_instance_main.py        |  24 +++++++++++++++-
 pulsar-functions/python-examples/exclamation.zip   | Bin 0 -> 39311 bytes
 .../integration/functions/PulsarFunctionsTest.java |  32 ++++++++++++++-------
 .../functions/PulsarFunctionsTestBase.java         |  12 ++++++--
 4 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 748923e..977345c 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -97,11 +97,33 @@ def main():
       if args.extra_dependency_repository:
         cmd = cmd + " --extra-index-url %s" % 
str(args.extra_dependency_repository)
       cmd = cmd + " %s" % str(args.py)
-      os.system(cmd)
+      retval = os.system(cmd)
+      if retval != 0:
+        print "Could not install user depedencies"
+        sys.exit(1)
     else:
       zpfile = zipfile.ZipFile(str(args.py), 'r')
       zpfile.extractall(os.path.dirname(str(args.py)))
     sys.path.insert(0, os.path.dirname(str(args.py)))
+  elif os.path.splitext(str(args.py))[1] == '.zip':
+    # Assumig zip file with format func.zip
+    # extract to folder function
+    # internal dir format
+    # "func/src"
+    # "func/requirements.txt"
+    # "func/deps"
+    # run pip install to target folder  deps folder
+    zpfile = zipfile.ZipFile(str(args.py), 'r')
+    zpfile.extractall(os.path.dirname(str(args.py)))
+    basename = os.path.splitext(str(args.py))[0]
+    requirements_txt_file = os.path.join(os.path.dirname(str(args.py)), 
basename, "requirements.txt")
+    deps_file = os.path.join(os.path.dirname(str(args.py)), basename, "deps")
+    cmd = "pip install -t %s -r %s --no-index --find-links %s" % 
(os.path.dirname(str(args.py)), requirements_txt_file, deps_file)
+    retval = os.system(cmd)
+    if retval != 0:
+      print "Could not install user depedencies specified by the zip file"
+      sys.exit(1)
+    sys.path.insert(0, os.path.join(os.path.dirname(str(args.py)), basename, 
"src"))
 
   log_file = os.path.join(args.logging_directory,
                           
util.getFullyQualifiedFunctionName(function_details.tenant, 
function_details.namespace, function_details.name),
diff --git a/pulsar-functions/python-examples/exclamation.zip 
b/pulsar-functions/python-examples/exclamation.zip
new file mode 100644
index 0000000..ff0f213
Binary files /dev/null and b/pulsar-functions/python-examples/exclamation.zip 
differ
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0222bd6..8b52358 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -613,25 +613,30 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
     @Test(enabled = false)
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false);
+        testExclamationFunction(Runtime.PYTHON, false, false);
+    }
+
+    @Test(enabled = false)
+    public void testPythonExclamationZipFunction() throws Exception {
+        testExclamationFunction(Runtime.PYTHON, false, true);
     }
 
     @Test(enabled = false)
     public void testPythonExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, true);
+        testExclamationFunction(Runtime.PYTHON, true, false);
     }
 
     @Test
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, false);
+        testExclamationFunction(Runtime.JAVA, false, false);
     }
 
     @Test
     public void testJavaExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, true);
+        testExclamationFunction(Runtime.JAVA, true, false);
     }
 
-    private void testExclamationFunction(Runtime runtime, boolean 
isTopicPattern) throws Exception {
+    private void testExclamationFunction(Runtime runtime, boolean 
isTopicPattern, boolean pyZip) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == 
Runtime.PYTHON) {
             // python can only run on process mode
             return;
@@ -660,7 +665,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         // submit the exclamation function
         submitExclamationFunction(
-            runtime, inputTopicName, outputTopicName, functionName);
+            runtime, inputTopicName, outputTopicName, functionName, pyZip);
 
         // get function info
         getFunctionInfoSuccess(functionName);
@@ -681,13 +686,15 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     private static void submitExclamationFunction(Runtime runtime,
                                                   String inputTopicName,
                                                   String outputTopicName,
-                                                  String functionName) throws 
Exception {
+                                                  String functionName,
+                                                  boolean pyZip) throws 
Exception {
         submitFunction(
             runtime,
             inputTopicName,
             outputTopicName,
             functionName,
-            getExclamationClass(runtime),
+            pyZip,
+            getExclamationClass(runtime, pyZip),
             Schema.STRING);
     }
 
@@ -695,6 +702,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                            String inputTopicName,
                                            String outputTopicName,
                                            String functionName,
+                                           boolean pyZip,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws 
Exception {
         CommandGenerator generator;
@@ -713,7 +721,11 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             command = generator.generateCreateFunctionCommand();
         } else if (Runtime.PYTHON == runtime) {
             generator.setRuntime(runtime);
-            command = 
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
+            if (pyZip) {
+                command = 
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
+            } else {
+                command = 
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
+            }
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
         }
@@ -860,7 +872,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         // submit the exclamation function
         submitFunction(
-            Runtime.JAVA, inputTopicName, outputTopicName, functionName,
+            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false,
             AutoSchemaFunction.class.getName(),
             Schema.AVRO(CustomObject.class));
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 7a47f77..9acdf1f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -76,13 +76,21 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
     public static final String EXCLAMATION_PYTHON_CLASS =
         "exclamation.ExclamationFunction";
 
+    public static final String EXCLAMATION_PYTHONZIP_CLASS =
+            "exclamation";
+
     public static final String EXCLAMATION_PYTHON_FILE = 
"exclamation_function.py";
+    public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
 
-    protected static String getExclamationClass(Runtime runtime) {
+    protected static String getExclamationClass(Runtime runtime, boolean 
pyZip) {
         if (Runtime.JAVA == runtime) {
             return EXCLAMATION_JAVA_CLASS;
         } else if (Runtime.PYTHON == runtime) {
-            return EXCLAMATION_PYTHON_CLASS;
+            if (pyZip) {
+                return EXCLAMATION_PYTHONZIP_CLASS;
+            } else {
+                return EXCLAMATION_PYTHON_CLASS;
+            }
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
         }

Reply via email to