This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 35cd182 Added ability to take python zip files that contain all their dependencies (#2915) 35cd182 is described below commit 35cd182f3fab0663bead931cd0ff6a0d7ce72b33 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Nov 2 16:17:18 2018 -0700 Added ability to take python zip files that contain all their dependencies (#2915) * Added ability to take python zip files that contain all their dependencies * Exit on failure --- .../src/main/python/python_instance_main.py | 24 +++++++++++++++- pulsar-functions/python-examples/exclamation.zip | Bin 0 -> 39311 bytes .../integration/functions/PulsarFunctionsTest.java | 32 ++++++++++++++------- .../functions/PulsarFunctionsTestBase.java | 12 ++++++-- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 748923e..977345c 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -97,11 +97,33 @@ def main(): if args.extra_dependency_repository: cmd = cmd + " --extra-index-url %s" % str(args.extra_dependency_repository) cmd = cmd + " %s" % str(args.py) - os.system(cmd) + retval = os.system(cmd) + if retval != 0: + print "Could not install user depedencies" + sys.exit(1) else: zpfile = zipfile.ZipFile(str(args.py), 'r') zpfile.extractall(os.path.dirname(str(args.py))) sys.path.insert(0, os.path.dirname(str(args.py))) + elif os.path.splitext(str(args.py))[1] == '.zip': + # Assumig zip file with format func.zip + # extract to folder function + # internal dir format + # "func/src" + # "func/requirements.txt" + # "func/deps" + # run pip install to target folder deps folder + zpfile = zipfile.ZipFile(str(args.py), 'r') + zpfile.extractall(os.path.dirname(str(args.py))) + basename = os.path.splitext(str(args.py))[0] + requirements_txt_file = os.path.join(os.path.dirname(str(args.py)), basename, "requirements.txt") + deps_file = os.path.join(os.path.dirname(str(args.py)), basename, "deps") + cmd = "pip install -t %s -r %s --no-index --find-links %s" % (os.path.dirname(str(args.py)), requirements_txt_file, deps_file) + retval = os.system(cmd) + if retval != 0: + print "Could not install user depedencies specified by the zip file" + sys.exit(1) + sys.path.insert(0, os.path.join(os.path.dirname(str(args.py)), basename, "src")) log_file = os.path.join(args.logging_directory, util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name), diff --git a/pulsar-functions/python-examples/exclamation.zip b/pulsar-functions/python-examples/exclamation.zip new file mode 100644 index 0000000..ff0f213 Binary files /dev/null and b/pulsar-functions/python-examples/exclamation.zip differ diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 0222bd6..8b52358 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -613,25 +613,30 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { @Test(enabled = false) public void testPythonExclamationFunction() throws Exception { - testExclamationFunction(Runtime.PYTHON, false); + testExclamationFunction(Runtime.PYTHON, false, false); + } + + @Test(enabled = false) + public void testPythonExclamationZipFunction() throws Exception { + testExclamationFunction(Runtime.PYTHON, false, true); } @Test(enabled = false) public void testPythonExclamationTopicPatternFunction() throws Exception { - testExclamationFunction(Runtime.PYTHON, true); + testExclamationFunction(Runtime.PYTHON, true, false); } @Test public void testJavaExclamationFunction() throws Exception { - testExclamationFunction(Runtime.JAVA, false); + testExclamationFunction(Runtime.JAVA, false, false); } @Test public void testJavaExclamationTopicPatternFunction() throws Exception { - testExclamationFunction(Runtime.JAVA, true); + testExclamationFunction(Runtime.JAVA, true, false); } - private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception { + private void testExclamationFunction(Runtime runtime, boolean isTopicPattern, boolean pyZip) throws Exception { if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) { // python can only run on process mode return; @@ -660,7 +665,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // submit the exclamation function submitExclamationFunction( - runtime, inputTopicName, outputTopicName, functionName); + runtime, inputTopicName, outputTopicName, functionName, pyZip); // get function info getFunctionInfoSuccess(functionName); @@ -681,13 +686,15 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { private static void submitExclamationFunction(Runtime runtime, String inputTopicName, String outputTopicName, - String functionName) throws Exception { + String functionName, + boolean pyZip) throws Exception { submitFunction( runtime, inputTopicName, outputTopicName, functionName, - getExclamationClass(runtime), + pyZip, + getExclamationClass(runtime, pyZip), Schema.STRING); } @@ -695,6 +702,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName, String outputTopicName, String functionName, + boolean pyZip, String functionClass, Schema<T> inputTopicSchema) throws Exception { CommandGenerator generator; @@ -713,7 +721,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { command = generator.generateCreateFunctionCommand(); } else if (Runtime.PYTHON == runtime) { generator.setRuntime(runtime); - command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE); + if (pyZip) { + command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE); + } else { + command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE); + } } else { throw new IllegalArgumentException("Unsupported runtime : " + runtime); } @@ -860,7 +872,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // submit the exclamation function submitFunction( - Runtime.JAVA, inputTopicName, outputTopicName, functionName, + Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, AutoSchemaFunction.class.getName(), Schema.AVRO(CustomObject.class)); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index 7a47f77..9acdf1f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -76,13 +76,21 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite { public static final String EXCLAMATION_PYTHON_CLASS = "exclamation.ExclamationFunction"; + public static final String EXCLAMATION_PYTHONZIP_CLASS = + "exclamation"; + public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py"; + public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip"; - protected static String getExclamationClass(Runtime runtime) { + protected static String getExclamationClass(Runtime runtime, boolean pyZip) { if (Runtime.JAVA == runtime) { return EXCLAMATION_JAVA_CLASS; } else if (Runtime.PYTHON == runtime) { - return EXCLAMATION_PYTHON_CLASS; + if (pyZip) { + return EXCLAMATION_PYTHONZIP_CLASS; + } else { + return EXCLAMATION_PYTHON_CLASS; + } } else { throw new IllegalArgumentException("Unsupported runtime : " + runtime); }