Author: daijy Date: Mon Jul 26 16:54:45 2010 New Revision: 979361 URL: http://svn.apache.org/viewvc?rev=979361&view=rev Log: PIG-928: UDFs in scripting languages
Added: hadoop/pig/trunk/src/org/apache/pig/scripting/ hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java hadoop/pig/trunk/src/org/apache/pig/scripting/jython/ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/build.xml hadoop/pig/trunk/ivy.xml hadoop/pig/trunk/ivy/libraries.properties hadoop/pig/trunk/ivy/pig.pom hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj hadoop/pig/trunk/test/findbugsExcludeFile.xml hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Jul 26 16:54:45 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-928: UDFs in scripting languages (daijy) + PIG-1509: Add .gitignore file (cwsteinbach via gates) PIG-1478: Add progress notification listener to PigRunner API (rding) Modified: hadoop/pig/trunk/build.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/build.xml (original) +++ hadoop/pig/trunk/build.xml Mon Jul 26 16:54:45 2010 @@ -174,6 +174,7 @@ <fileset file="${ivy.lib.dir}/jackson-mapper-asl-${jackson.version}.jar"/> <fileset file="${ivy.lib.dir}/jackson-core-asl-${jackson.version}.jar"/> <fileset file="${ivy.lib.dir}/joda-time-${joda-time.version}.jar" /> + <fileset file="${ivy.lib.dir}/jython-${jython.version}.jar" /> <!-- <fileset file="${lib.dir}/commons-collections-3.2.jar" /> --> </path> Modified: hadoop/pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy.xml?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/ivy.xml (original) +++ hadoop/pig/trunk/ivy.xml Mon Jul 26 16:54:45 2010 @@ -72,6 +72,7 @@ <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}" conf="compile->master"/> <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" conf="compile->master"/> + <dependency org="org.python" name="jython" rev="${jython.version}" conf="compile->master"/> <!--ATM hbase, hbase-test.jar, hadoop.jar are resolved from the lib dir--> </dependencies> </ivy-module> Modified: hadoop/pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/ivy/libraries.properties (original) +++ hadoop/pig/trunk/ivy/libraries.properties Mon Jul 26 16:54:45 2010 @@ -36,3 +36,4 @@ xerces.version=1.4.4 jackson.version=1.0.1 joda-time.version=1.6 +jython.version=2.5.0 Modified: hadoop/pig/trunk/ivy/pig.pom URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/pig.pom?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/ivy/pig.pom (original) +++ hadoop/pig/trunk/ivy/pig.pom Mon Jul 26 16:54:45 2010 @@ -95,5 +95,11 @@ <version>${joda-time.version}</version> </dependency> + <dependency> + <groupId>org.python</groupId> + <artifactId>jython</artifactId> + <version>${jython.version}/version> + </dependency> + </dependencies> </project> Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Jul 26 16:54:45 2010 @@ -96,6 +96,7 @@ import org.apache.pig.impl.util.LogUtils import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.PropertiesUtil; import org.apache.pig.pen.ExampleGenerator; +import org.apache.pig.scripting.ScriptEngine; import org.apache.pig.tools.grunt.GruntParser; import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; import org.apache.pig.tools.pigstats.OutputStats; @@ -468,6 +469,31 @@ public class PigServer { } /** + * Universal Scripting Language Support, see PIG-928 + * + * @param path path of the script file + * @param scriptingLang language keyword or scriptingEngine used to interpret the script + * @param namespace namespace defined for functions of this script + * @throws IOException + */ + public void registerCode(String path, String scriptingLang, String namespace) + throws IOException { + File f = new File(path); + + if (!f.canRead()) { + int errCode = 4002; + String msg = "Can't read file: " + path; + throw new FrontendException(msg, errCode, + PigException.USER_ENVIRONMENT); + } + if(scriptingLang != null) { + ScriptEngine se = ScriptEngine.getInstance(scriptingLang); + se.registerFunctions(path, namespace, pigContext); + } + pigContext.addScriptFile(path); + } + + /** * Register a query with the Pig runtime. The query is parsed and registered, but it is not * executed until it is needed. * Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Jul 26 16:54:45 2010 @@ -221,7 +221,9 @@ public class POUserFunc extends Expressi } } if(resultType == DataType.BYTEARRAY) { - if(res.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) { + // This is needed if some EvalFunc has default datatype as bytearray and returns arbitrary objects + // We see such behavior in case of script EvalFunc, which is used to run udfs in scripting langs + if(result.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) { result.result = new DataByteArray(result.result.toString().getBytes()); } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Jul 26 16:54:45 2010 @@ -90,6 +90,12 @@ public class PigContext implements Seria private Properties properties; + // script files that are needed to run a job + public List<String> scriptFiles = new ArrayList<String>(); + + // script jars that are needed to run a script - jython.jar etc + public List<String> scriptJars = new ArrayList<String>(2); + /** * a table mapping function names to function specs. */ @@ -205,6 +211,12 @@ public class PigContext implements Seria } } + public void addScriptFile(String path) throws MalformedURLException { + if (path != null) { + scriptFiles.add(path); + } + } + public void addJar(String path) throws MalformedURLException { if (path != null) { URL resource = (new File(path)).toURI().toURL(); @@ -381,7 +393,6 @@ public class PigContext implements Seria - /** * Creates a Classloader based on the passed jarFile and any extra jar files. Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jul 26 16:54:45 2010 @@ -3256,7 +3256,6 @@ ExpressionOperator EvalFuncSpec(Schema o { String funcName = null; FuncSpec funcSpec = null; - String funcNameAlias = null; boolean registeredFunction = false; List<ExpressionOperator> args; ExpressionOperator userFunc; @@ -3265,30 +3264,28 @@ ExpressionOperator EvalFuncSpec(Schema o log.trace("Entering EvalFuncSpec"); } { - ( - ( - LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) funcNameAlias=QualifiedFunction() - { - func = pigContext.instantiateFuncFromAlias(funcNameAlias); - try{ - FunctionType.tryCasting(func, funcType); - } catch (Exception e){ + // get the func object + funcName = QualifiedFunction() + { + // look into defined functions map and try casting + // This supports namespace.func, jarname.package.func scenarios + try{ + func = pigContext.instantiateFuncFromAlias(funcName); + FunctionType.tryCasting(func, funcType); + }catch (Exception e){ ParseException pe = new ParseException(e.getMessage()); - pe.initCause(e); + pe.initCause(e); throw pe; } - } - ) -| func=EvalFunction(funcType) - ) - "(" args=EvalArgs(over,specs,lp,input) ")" + } + "(" args=EvalArgs(over,specs,lp,input) ")" { if(null != func) { - funcName = func.getClass().getName(); - if(null != funcNameAlias) { - funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias); + if(null != pigContext.getFuncSpecFromAlias(funcName)) { + funcSpec = pigContext.getFuncSpecFromAlias(funcName); } else { - funcSpec = new FuncSpec(funcName); + funcName = func.getClass().getName(); + funcSpec = new FuncSpec(funcName); } byte type = DataType.BYTEARRAY; switch(funcType) { @@ -3303,7 +3300,7 @@ ExpressionOperator EvalFuncSpec(Schema o } userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcSpec, type); } else { - throw new ParseException("Could not instantiate function: " + funcNameAlias); + throw new ParseException("Could not instantiate function: " + funcName); } lp.add(userFunc); log.debug("EvalFuncSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp); @@ -3729,29 +3726,6 @@ Schema TypeTupleSchema() : // These the simple non-terminals that are shared across many -Object EvalFunction(byte funcType) : -{ - String funcName; - Object func = null; - log.trace("Entering EvalFunction"); -} -{ - funcName = QualifiedFunction() - { - func = pigContext.instantiateFuncFromAlias(funcName); - try{ - FunctionType.tryCasting(func, funcType); - }catch (Exception e){ - ParseException pe = new ParseException(e.getMessage()); - pe.initCause(e); - throw pe; - } - log.trace("Exiting EvalFunction"); - - return func; - } -} - String EvalClass(byte classType) : { String className; Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Jul 26 16:54:45 2010 @@ -17,6 +17,7 @@ */ package org.apache.pig.impl.util; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,7 +31,6 @@ import java.net.URLDecoder; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; @@ -39,11 +39,10 @@ import java.util.jar.JarInputStream; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.impl.PigContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.impl.PigContext; public class JarManager { @@ -117,11 +116,18 @@ public class JarManager { // log.error("Adding " + jarEntry.jar + ":" + jarEntry.prefix); mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents); } - for (int i = 0; i < pigContext.extraJars.size(); i++) { + for (String scriptJar: pigContext.scriptJars) { + mergeJar(jarFile, scriptJar, null, contents); + } + for (URL extraJar: pigContext.extraJars) { // log.error("Adding extra " + pigContext.extraJars.get(i)); - mergeJar(jarFile, pigContext.extraJars.get(i), null, contents); + mergeJar(jarFile, extraJar, null, contents); } - + for (int i = 0; i < pigContext.scriptFiles.size(); i++) { + String path = pigContext.scriptFiles.get(i); + addStream(jarFile, path, new FileInputStream(new File(path)),contents); + } + jarFile.putNextEntry(new ZipEntry("pigContext")); new ObjectOutputStream(jarFile).writeObject(pigContext); jarFile.close(); Added: hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=979361&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Jul 26 16:54:45 2010 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.scripting; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.parser.QueryParser; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * Base class for various scripting implementations + */ +abstract public class ScriptEngine { + /** + * Pig supported scripting languages with their keywords + */ + private static final Map<String, String> supportedScriptLangs = new HashMap<String, String>(); + static { + // Python + supportedScriptLangs.put("jython", "org.apache.pig.scripting.jython.JythonScriptEngine"); + // Ruby + //supportedScriptLangs.put("jruby", "org.apache.pig.scripting.jruby.JrubyScriptEngine"); + } + public static final String namespaceSeparator = "."; + + /** + * registers the Jython functions as Pig functions with given namespace + * + * @param path path of the script + * @param namespace namespace for the functions + * @param pigContext pigcontext to register functions to pig in the given namespace + * @throws IOException + */ + public abstract void registerFunctions(String path, String namespace, PigContext pigContext) throws IOException; + + /** + * figure out the jar location from the class + * @param clazz + * @return the jar file location, null if the class was not loaded from a jar + * @throws FileNotFoundException + */ + protected static String getJarPath(Class<?> clazz) throws FileNotFoundException { + URL resource = clazz.getClassLoader().getResource(clazz.getCanonicalName().replace(".","/")+".class"); + if (resource.getProtocol().equals("jar")) { + return resource.getPath().substring(resource.getPath().indexOf(':')+1,resource.getPath().indexOf('!')); + } + throw new FileNotFoundException("Jar for "+ clazz.getName() +" class is not found"); + } + + /** + * get instance of the scriptEngine for the given scriptingLang + * + * @param scriptingLang ScriptEngine classname or keyword for the scriptingLang + * @return scriptengine for the given scripting language + * @throws IOException + */ + public static ScriptEngine getInstance(String scriptingLang) + throws IOException { + String scriptingEngine = scriptingLang; + try { + if(supportedScriptLangs.containsKey(scriptingLang)) { + scriptingEngine = supportedScriptLangs.get(scriptingLang); + } + return (ScriptEngine) Class.forName(scriptingEngine).newInstance(); + } catch (Exception e) { + throw new IOException("Could not load ScriptEngine: " + + scriptingEngine + ": " + e); + } + } +} Added: hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=979361&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java Mon Jul 26 16:54:45 2010 @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.scripting.jython; + +import java.io.IOException; + +import org.apache.pig.EvalFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.scripting.ScriptEngine; +import org.python.core.Py; +import org.python.core.PyBaseCode; +import org.python.core.PyException; +import org.python.core.PyFunction; +import org.python.core.PyObject; + +/** + * Python implementation of a Pig UDF Performs mappings between Python & Pig + * data structures + */ +public class JythonFunction extends EvalFunc<Object> { + private PyFunction function; + private Schema schema; + private int num_parameters; + private String scriptFilePath; + private String outputSchemaFunc; + + public JythonFunction(String filename, String functionName) throws IOException{ + PyFunction f; + boolean found = false; + + try { + f = JythonScriptEngine.getFunction(filename, functionName); + this.function = f; + num_parameters = ((PyBaseCode) f.func_code).co_argcount; + PyObject outputSchemaDef = f.__findattr__("outputSchema".intern()); + if (outputSchemaDef != null) { + this.schema = Utils.getSchemaFromString(outputSchemaDef.toString()); + found = true; + } + PyObject outputSchemaFunctionDef = f.__findattr__("outputSchemaFunction".intern()); + if (outputSchemaFunctionDef != null) { + if(found) { + throw new ExecException( + "multiple decorators for " + functionName); + } + scriptFilePath = filename; + outputSchemaFunc = outputSchemaFunctionDef.toString(); + this.schema = null; + found = true; + } + PyObject schemaFunctionDef = f.__findattr__("schemaFunction".intern()); + if (schemaFunctionDef != null) { + if(found) { + throw new ExecException( + "multiple decorators for " + functionName); + } + // We should not see these functions here + // BUG + throw new ExecException( + "unregistered " + functionName); + } + } catch (ParseException pe) { + throw new ExecException("Could not parse schema for script function " + pe); + } catch (IOException e) { + throw new IllegalStateException("Could not initialize: " + filename); + } catch (Exception e) { + throw new ExecException("Could not initialize: " + filename); + } + } + + @Override + public Object exec(Tuple tuple) throws IOException { + try { + if (tuple == null || num_parameters == 0) { + // ignore input tuple + PyObject out = function.__call__(); + return JythonUtils.pythonToPig(out); + } + else { + // this way we get the elements of the tuple as parameters instead + // of one tuple object + PyObject[] params = JythonUtils.pigTupleToPyTuple(tuple).getArray(); + return JythonUtils.pythonToPig(function.__call__(params)); + } + } catch (PyException e) { + throw new ExecException("Error executing function: " + e); + } catch (Exception e) { + throw new IOException("Error executing function: " + e); + } + } + + @Override + public Schema outputSchema(Schema input) { + if(schema != null) { + return schema; + } else { + if(outputSchemaFunc != null) { + PyFunction pf; + try { + pf = JythonScriptEngine.getFunction(scriptFilePath, outputSchemaFunc); + // this should be a schema function + PyObject schemaFunctionDef = pf.__findattr__("schemaFunction".intern()); + if(schemaFunctionDef == null) { + throw new IllegalStateException("Function: " + + outputSchemaFunc + " is not a schema function"); + } + return (Schema)((pf.__call__(Py.java2py(input))).__tojava__(Object.class)); + } catch (IOException ioe) { + throw new IllegalStateException("Could not find function: " + + outputSchemaFunc + "()"); + } + } else { + return new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY)); + } + } + } +} + Added: hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java?rev=979361&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java Mon Jul 26 16:54:45 2010 @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.scripting.jython; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.scripting.ScriptEngine; +import org.python.core.PyFunction; +import org.python.core.PyObject; +import org.python.core.PyStringMap; +import org.python.core.PyTuple; +import org.python.util.PythonInterpreter; + +/** + * Implementation of the script engine for Jython + */ +public class JythonScriptEngine extends ScriptEngine { + + // Decorators - + // "schemaFunction" + // "outputSchema" + // "outputSchemaFunction" + + /** + * Language Interpreter Uses static holder pattern + */ + private static class Interpreter { + static final PythonInterpreter interpreter = new PythonInterpreter(); + static volatile ArrayList<String> filesLoaded = new ArrayList<String>(); + + static synchronized void init(String path) throws IOException { + if (!filesLoaded.contains(path)) { + // attempt addition of schema decorator handler, fail silently + interpreter.exec("def outputSchema(schema_def):\n" + + " def decorator(func):\n" + + " func.outputSchema = schema_def\n" + + " return func\n" + + " return decorator\n\n"); + + interpreter.exec("def outputSchemaFunction(schema_def):\n" + + " def decorator(func):\n" + + " func.outputSchemaFunction = schema_def\n" + + " return func\n" + + " return decorator\n"); + + interpreter.exec("def schemaFunction(schema_def):\n" + + " def decorator(func):\n" + + " func.schemaFunction = schema_def\n" + + " return func\n" + + " return decorator\n\n"); + + InputStream is = null; + File file = new File(path); + if (file.exists()) { + is = new FileInputStream(file); + } + else { + is = Interpreter.class.getResourceAsStream("/" + path); + } + if (is != null) { + interpreter.execfile(is); + filesLoaded.add(path); + is.close(); + } else { + throw new IOException( + "Could not initialize interpreter with "+ path); + } + } + } + } + + @Override + public void registerFunctions(String path, String namespace, PigContext pigContext) + throws IOException{ + Interpreter.init(path); + pigContext.scriptJars.add(getJarPath(PythonInterpreter.class)); + PythonInterpreter pi = Interpreter.interpreter; + @SuppressWarnings("unchecked") + List<PyTuple> locals = (List<PyTuple>) ((PyStringMap) pi.getLocals()).items(); + if(namespace == null) { + namespace = ""; + } + else { + namespace = namespace + namespaceSeparator; + } + try { + for (PyTuple item : locals) { + String key = (String) item.get(0); + Object value = item.get(1); + FuncSpec funcspec = null; + if (!key.startsWith("__") && !key.equals("schemaFunction") + && !key.equals("outputSchema") + && !key.equals("outputSchemaFunction") + && (value instanceof PyFunction) + && (((PyFunction)value).__findattr__("schemaFunction".intern())== null)) { + PyObject obj = ((PyFunction)value).__findattr__("outputSchema".intern()); + if(obj != null) { + Utils.getSchemaFromString(obj.toString()); + } + funcspec = new FuncSpec(JythonFunction.class.getCanonicalName() + "('" + + path + "','" + key +"')"); + pigContext.registerFunction(namespace + key, funcspec); + } + } + } catch (ParseException pe) { + throw new IOException("Error parsing schema for script function from the decorator " + pe); + } + } + + public static PyFunction getFunction(String path, String functionName) throws IOException { + Interpreter.init(path); + return (PyFunction) Interpreter.interpreter.get(functionName); + } +} Added: hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java?rev=979361&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java Mon Jul 26 16:54:45 2010 @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.scripting.jython; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.DefaultBagFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.python.core.Py; +import org.python.core.PyDictionary; +import org.python.core.PyFloat; +import org.python.core.PyInteger; +import org.python.core.PyList; +import org.python.core.PyLong; +import org.python.core.PyNone; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyTuple; + +public class JythonUtils { + + private static TupleFactory tupleFactory = TupleFactory.getInstance(); + private static BagFactory bagFactory = DefaultBagFactory.getInstance(); + + @SuppressWarnings("unchecked") + public static Object pythonToPig(PyObject pyObject) throws ExecException { + try { + Object javaObj = null; + // Add code for all supported pig types here + // Tuple, bag, map, int, long, float, double, chararray, bytearray + if (pyObject instanceof PyTuple) { + PyTuple pyTuple = (PyTuple) pyObject; + Object[] tuple = new Object[pyTuple.size()]; + int i = 0; + for (PyObject tupleObject : pyTuple.getArray()) { + tuple[i++] = pythonToPig(tupleObject); + } + javaObj = tupleFactory.newTuple(Arrays.asList(tuple)); + } else if (pyObject instanceof PyList) { + DataBag list = bagFactory.newDefaultBag(); + for (PyObject bagTuple : ((PyList) pyObject).asIterable()) { + // In jython, list need not be a bag of tuples, as it is in case of pig + // So we fail with cast exception if we dont find tuples inside bag + // This is consistent with java udf (bag should be filled with tuples) + list.add((Tuple) pythonToPig(bagTuple)); + } + javaObj = list; + } else if (pyObject instanceof PyDictionary) { + Map<?, PyObject> map = Py.tojava(pyObject, Map.class); + Map<Object, Object> newMap = new HashMap<Object, Object>(); + for (Map.Entry<?, PyObject> entry : map.entrySet()) { + newMap.put(entry.getKey(), pythonToPig(entry.getValue())); + } + javaObj = newMap; + } else if (pyObject instanceof PyLong) { + javaObj = pyObject.__tojava__(Long.class); + } else if (pyObject instanceof PyInteger) { + javaObj = pyObject.__tojava__(Integer.class); + } else if (pyObject instanceof PyFloat) { + // J(P)ython is loosely typed, supports only float type, + // hence we convert everything to double to save precision + javaObj = pyObject.__tojava__(Double.class); + } else if (pyObject instanceof PyString) { + javaObj = pyObject.__tojava__(String.class); + } else if (pyObject instanceof PyNone) { + return null; + } else { + javaObj = pyObject.__tojava__(byte[].class); + // if we successfully converted to byte[] + if(javaObj instanceof byte[]) { + javaObj = new DataByteArray((byte[])javaObj); + } + else { + throw new ExecException("Non supported pig datatype found, cast failed"); + } + } + if(javaObj.equals(Py.NoConversion)) { + throw new ExecException("Cannot cast into any pig supported type"); + } + return javaObj; + } catch (Exception e) { + throw new ExecException("Cannot convert jython type to pig datatype "+ e); + } + } + + public static PyObject pigToPython(Object object) { + if (object instanceof Tuple) { + return pigTupleToPyTuple((Tuple) object); + } else if (object instanceof DataBag) { + PyList list = new PyList(); + for (Tuple bagTuple : (DataBag) object) { + list.add(pigTupleToPyTuple(bagTuple)); + } + return list; + } else if (object instanceof Map<?, ?>) { + PyDictionary newMap = new PyDictionary(); + for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) { + newMap.put(entry.getKey(), pigToPython(entry.getValue())); + } + return newMap; + } else if (object instanceof DataByteArray) { + return Py.java2py(((DataByteArray) object).get()); + } else { + return Py.java2py(object); + } + } + + public static PyTuple pigTupleToPyTuple(Tuple tuple) { + PyObject[] pyTuple = new PyObject[tuple.size()]; + int i = 0; + for (Object object : tuple.getAll()) { + pyTuple[i++] = pigToPython(object); + } + return new PyTuple(pyTuple); + } + +} + Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Jul 26 16:54:45 2010 @@ -365,6 +365,19 @@ public class GruntParser extends PigScri protected void processRegister(String jar) throws IOException { mPigServer.registerJar(jar); } + + @Override + protected void processRegister(String path, String scriptingLang, String namespace) throws IOException, ParseException { + if(path.endsWith(".jar")) { + if(scriptingLang != null || namespace != null) { + throw new ParseException("Cannot register a jar with a scripting language or namespace"); + } + mPigServer.registerJar(path); + } + else { + mPigServer.registerCode(path, scriptingLang, namespace); + } + } private String runPreprocessor(String script, List<String> params, List<String> files) Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Mon Jul 26 16:54:45 2010 @@ -74,6 +74,8 @@ public abstract class PigScriptParser abstract protected void processExplain(String alias, String script, boolean isVerbose, String format, String target, List<String> params, List<String> files) throws IOException, ParseException; abstract protected void processRegister(String jar) throws IOException; + + abstract protected void processRegister(String path, String scriptingEngine, String namespace) throws IOException, ParseException; abstract protected void processSet(String key, String value) throws IOException, ParseException; @@ -152,6 +154,8 @@ TOKEN: {<MKDIR: "mkdir">} TOKEN: {<PWD: "pwd">} TOKEN: {<QUIT: "quit">} TOKEN: {<REGISTER: "register">} +TOKEN: {<USING: "using">} +TOKEN: {<AS: "as"> } TOKEN: {<REMOVE: "rm">} TOKEN: {<REMOVEFORCE: "rmf">} TOKEN: {<SET: "set">} @@ -372,8 +376,8 @@ TOKEN : { <QUOTEDSTRING : "'" "'"> } void parse() throws IOException: { - Token t1, t2; - String val = null; + Token t1, t2, t3; + String engine = null, namespace = null; List<String> cmdTokens = new ArrayList<String>(); } @@ -501,7 +505,15 @@ void parse() throws IOException: | <REGISTER> t1 = GetPath() - {processRegister(unquote(t1.image));} + [ + <USING> t2 = GetPath() + { engine = t2.image; } + [ + <AS> t3 = <IDENTIFIER> + {namespace = t3.image; } + ] + ] + {processRegister(unquote(t1.image), engine, namespace); } | Script() | @@ -701,6 +713,10 @@ Token GetReserved () : | t = <REGISTER> | + t = <USING> + | + t = <AS> + | t = <REMOVE> | t = <SET> Modified: hadoop/pig/trunk/test/findbugsExcludeFile.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/findbugsExcludeFile.xml?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/test/findbugsExcludeFile.xml (original) +++ hadoop/pig/trunk/test/findbugsExcludeFile.xml Mon Jul 26 16:54:45 2010 @@ -407,4 +407,9 @@ <Field name = "rNums" /> <Bug pattern="MS_OOI_PKGPROTECT" /> </Match> + <Match> + <Class name="org.apache.pig.scripting.jython.JythonScriptEngine$Interpreter" /> + <Method name = "init" /> + <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> + </Match> </FindBugsFilter> Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=979361&r1=979360&r2=979361&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Mon Jul 26 16:54:45 2010 @@ -942,6 +942,32 @@ public class TestGrunt extends TestCase assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar"))); } + @Test + public void testRegisterScripts() throws Throwable { + String[] script = { + "#!/usr/bin/python", + "@outputSchema(\"x:{t:(num:long)}\")", + "def square(number):" , + "\treturn (number * number)" + }; + + Util.createLocalInputFile( "testRegisterScripts.py", script); + + PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigContext context = server.getPigContext(); + + String strCmd = "register testRegisterScripts.py using jython as pig\n"; + + ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); + InputStreamReader reader = new InputStreamReader(cmd); + + Grunt grunt = new Grunt(new BufferedReader(reader), context); + + grunt.exec(); + assertTrue(context.getFuncSpecFromAlias("pig.square") != null); + + } + /* @Test public void testScriptMissingLastNewLine() throws Throwable { PigServer server = new PigServer(ExecType.LOCAL); @@ -1077,5 +1103,5 @@ public class TestGrunt extends TestCase new Grunt(new BufferedReader(reader), pc).exec(); assertEquals("my.arbitrary.value", pc.getExecutionEngine().getConfiguration().getProperty("my.arbitrary.key")); - } + }*/ } Added: hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java?rev=979361&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java Mon Jul 26 16:54:45 2010 @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.util.Iterator; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.FileLocalizer; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +public class TestScriptUDF extends TestCase { + static MiniCluster cluster = MiniCluster.buildCluster(); + private PigServer pigServer; + + TupleFactory mTf = TupleFactory.getInstance(); + BagFactory mBf = BagFactory.getInstance(); + + @Before + @Override + public void setUp() throws Exception{ + FileLocalizer.setR(new Random()); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + cluster.shutDown(); + } + + // See PIG-928 + @Test + public void testPythonStandardScript() throws Exception{ + String[] script = { + "#!/usr/bin/python", + "@outputSchema(\"x:{t:(num:long)}\")", + "def square(number):" , + "\treturn (number * number)" + }; + String[] input = { + "1\t3", + "2\t4", + "3\t5" + }; + + Util.createInputFile(cluster, "table_testPythonStandardScript", input); + Util.createLocalInputFile( "testPythonStandardScript.py", script); + + // Test the namespace + pigServer.registerCode("testPythonStandardScript.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonStandardScript' as (a0:long, a1:long);"); + pigServer.registerQuery("B = foreach A generate pig.square(a0);"); + + pigServer.registerCode("testPythonStandardScript.py", "jython", null); + pigServer.registerQuery("C = foreach A generate square(a0);"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.toString().equals("(1)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(4)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(9)")); + + iter = pigServer.openIterator("C"); + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(1)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(4)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(9)")); + } + + // See PIG-928 + @Test + public void testPythonScriptWithSchemaFunction() throws Exception{ + String[] script = { + "#!/usr/bin/python", + "@outputSchemaFunction(\"squareSchema\")", + "def square(number):" , + "\treturn (number * number)\n", + "@schemaFunction(\"square\")", + "def squareSchema(input):", + "\treturn input " + }; + String[] input = { + "1\t3.0", + "2\t4.0", + "3\t5.0" + }; + + Util.createInputFile(cluster, "table_testPythonScriptWithSchemaFunction", input); + Util.createLocalInputFile( "testPythonScriptWithSchemaFunction.py", script); + + // Test the namespace + pigServer.registerCode("testPythonScriptWithSchemaFunction.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonScriptWithSchemaFunction' as (a0:int, a1:double);"); + pigServer.registerQuery("B = foreach A generate pig.square(a0);"); + + pigServer.registerCode("testPythonScriptWithSchemaFunction.py", "jython", null); + pigServer.registerQuery("C = foreach A generate square(a1);"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.toString().equals("(1)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(4)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(9)")); + + // The same python function will operate on double and try to get square of double + // Since these are small double numbers we do not need to use delta to test the results + iter = pigServer.openIterator("C"); + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(9.0)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(16.0)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(25.0)")); + } + + // See PIG-928 + @Test + public void testPythonScriptUDFNoDecorator() throws Exception{ + String[] script = { + "#!/usr/bin/python", + // No decorator means schema is null - bytearray... + "def concat(word):" , + "\treturn word + word" + }; + String[] input = { + "hello\t1", + "pig\t2", + "world\t3" + }; + + Util.createInputFile(cluster, "table_testPythonScriptUDFNoDecorator", input); + Util.createLocalInputFile( "testPythonScriptUDFNoDecorator.py", script); + + pigServer.registerCode("testPythonScriptUDFNoDecorator.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFNoDecorator' as (a0, a1:int);"); + pigServer.registerQuery("B = foreach A generate pig.concat(a0);"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + // We need to check whether this is a DataByteArray or fail otherwise + if(!(t.get(0) instanceof DataByteArray)) { + fail("Default return type should be bytearray"); + } + + assertTrue(t.get(0).toString().trim().equals("hellohello")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.get(0).toString().trim().equals("pigpig")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.get(0).toString().trim().equals("worldworld")); + } + + @Test + public void testPythonScriptUDFBagInput() throws Exception{ + String[] script = { + "#!/usr/bin/python", + "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")", + "def collect(bag):" , + "\toutBag = []", + "\tfor word in bag:", + // We need to wrap word inside a tuple for pig + "\t\ttup=(len(bag), word[1])", + "\t\toutBag.append(tup)", + "\treturn outBag" + }; + String[] input = { + "1\thello", + "2\tpig", + "1\tworld", + "1\tprogram", + "2\thadoop" + }; + + Util.createInputFile(cluster, "table_testPythonScriptUDFBagInput", input); + Util.createLocalInputFile( "testPythonScriptUDFBagInput.py", script); + + pigServer.registerCode("testPythonScriptUDFBagInput.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFBagInput' as (a0:int, a1:chararray);"); + pigServer.registerQuery("B = group A by a0;"); + pigServer.registerQuery("C = foreach B generate pig.collect(A);"); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + DataBag bag; + Tuple tup; + bag = BagFactory.getInstance().newDefaultBag(); + tup = TupleFactory.getInstance().newTuple(); + tup.append(3); + tup.append("hello"); + bag.add(tup); + tup = TupleFactory.getInstance().newTuple(); + tup.append(3); + tup.append("world"); + bag.add(tup); + tup = TupleFactory.getInstance().newTuple(); + tup.append(3); + tup.append("program"); + bag.add(tup); + + assertTrue(t.get(0).toString().equals(bag.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + + bag = BagFactory.getInstance().newDefaultBag(); + tup = TupleFactory.getInstance().newTuple(); + tup.append(2); + tup.append("pig"); + bag.add(tup); + tup = TupleFactory.getInstance().newTuple(); + tup.append(2); + tup.append("hadoop"); + bag.add(tup); + + assertTrue(t.get(0).toString().equals(bag.toString())); + } + + @Test + public void testPythonScriptUDFMapInput() throws Exception{ + String[] script = { + "#!/usr/bin/python", + "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")", + "def maptobag(map):" , + "\toutBag = []", + "\tfor k, v in map.iteritems():", + // We need to wrap word inside a tuple for pig + "\t\ttup = (k, v)", + "\t\toutBag.append(tup)", + "\treturn outBag" + }; + String[] input = { + "[1#hello,2#world]", + "[3#pig,4#rocks]", + }; + + Util.createInputFile(cluster, "table_testPythonScriptUDFMapInput", input); + Util.createLocalInputFile( "testPythonScriptUDFMapInput.py", script); + + pigServer.registerCode("testPythonScriptUDFMapInput.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFMapInput' as (a0:map[]);"); + pigServer.registerQuery("B = foreach A generate pig.maptobag(a0);"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + DataBag bag; + Tuple tup; + bag = BagFactory.getInstance().newDefaultBag(); + tup = TupleFactory.getInstance().newTuple(); + tup.append(1); + tup.append("hello"); + bag.add(tup); + tup = TupleFactory.getInstance().newTuple(); + tup.append(2); + tup.append("world"); + bag.add(tup); + assertTrue(t.get(0).toString().equals(bag.toString())); + + assertTrue(iter.hasNext()); + t = iter.next(); + bag = BagFactory.getInstance().newDefaultBag(); + tup = TupleFactory.getInstance().newTuple(); + tup.append(3); + tup.append("pig"); + bag.add(tup); + tup = TupleFactory.getInstance().newTuple(); + tup.append(4); + tup.append("rocks"); + bag.add(tup); + assertTrue(t.get(0).toString().equals(bag.toString())); + + assertFalse(iter.hasNext()); + + } + + @Test + public void testPythonScriptUDFNullInputOutput() throws Exception { + String[] script = { + "#!/usr/bin/python", + "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")", + "def multStr(cnt, str):" , + "\tif cnt != None:", + "\t\treturn cnt * str", + "\telse:", + "\t\treturn None" + }; + String[] input = { + "3\thello", + // Null input + "\tworld", + }; + + Util.createInputFile(cluster, "table_testPythonScriptUDFNullInputOutput", input); + Util.createLocalInputFile( "testPythonScriptUDFNullInputOutput.py", script); + + pigServer.registerCode("testPythonScriptUDFNullInputOutput.py", "jython", "pig"); + pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFNullInputOutput' as (a0:int, a1:chararray);"); + pigServer.registerQuery("B = foreach A generate pig.multStr(a0, a1);"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.get(0).toString().equals("hellohellohello")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + // UDF takes null and returns null + assertTrue(t.get(0) == null); + + } +} +