Author: daijy
Date: Mon Jul 26 16:54:45 2010
New Revision: 979361

URL: http://svn.apache.org/viewvc?rev=979361&view=rev
Log:
PIG-928: UDFs in scripting languages

Added:
    hadoop/pig/trunk/src/org/apache/pig/scripting/
    hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
    hadoop/pig/trunk/src/org/apache/pig/scripting/jython/
    hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java
    hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
    hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/ivy.xml
    hadoop/pig/trunk/ivy/libraries.properties
    hadoop/pig/trunk/ivy/pig.pom
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/trunk/test/findbugsExcludeFile.xml
    hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jul 26 16:54:45 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-928: UDFs in scripting languages (daijy)
+
 PIG-1509: Add .gitignore file (cwsteinbach via gates)
 
 PIG-1478: Add progress notification listener to PigRunner API (rding)

Modified: hadoop/pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Mon Jul 26 16:54:45 2010
@@ -174,6 +174,7 @@
        <fileset 
file="${ivy.lib.dir}/jackson-mapper-asl-${jackson.version}.jar"/>
        <fileset file="${ivy.lib.dir}/jackson-core-asl-${jackson.version}.jar"/>
         <fileset file="${ivy.lib.dir}/joda-time-${joda-time.version}.jar" />
+               <fileset file="${ivy.lib.dir}/jython-${jython.version}.jar" />
        <!-- <fileset file="${lib.dir}/commons-collections-3.2.jar" />  -->
     </path>
 

Modified: hadoop/pig/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy.xml?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy.xml (original)
+++ hadoop/pig/trunk/ivy.xml Mon Jul 26 16:54:45 2010
@@ -72,6 +72,7 @@
     <dependency org="org.codehaus.jackson" name="jackson-core-asl" 
rev="${jackson.version}"
       conf="compile->master"/>
     <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" 
conf="compile->master"/>
+       <dependency org="org.python" name="jython" rev="${jython.version}" 
conf="compile->master"/>
     <!--ATM hbase, hbase-test.jar, hadoop.jar are resolved from the lib dir--> 
        
     </dependencies>
 </ivy-module>

Modified: hadoop/pig/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/libraries.properties (original)
+++ hadoop/pig/trunk/ivy/libraries.properties Mon Jul 26 16:54:45 2010
@@ -36,3 +36,4 @@ xerces.version=1.4.4
 
 jackson.version=1.0.1
 joda-time.version=1.6
+jython.version=2.5.0

Modified: hadoop/pig/trunk/ivy/pig.pom
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/pig.pom?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/ivy/pig.pom (original)
+++ hadoop/pig/trunk/ivy/pig.pom Mon Jul 26 16:54:45 2010
@@ -95,5 +95,11 @@
         <version>${joda-time.version}</version>
     </dependency>
 
+       <dependency>
+           <groupId>org.python</groupId>
+           <artifactId>jython</artifactId>
+           <version>${jython.version}/version>
+       </dependency>
+
   </dependencies> 
 </project>

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Jul 26 16:54:45 2010
@@ -96,6 +96,7 @@ import org.apache.pig.impl.util.LogUtils
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.pen.ExampleGenerator;
+import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -468,6 +469,31 @@ public class PigServer {
     }
     
     /**
+     * Universal Scripting Language Support, see PIG-928
+     * 
+     * @param path path of the script file
+     * @param scriptingLang language keyword or scriptingEngine used to 
interpret the script
+     * @param namespace namespace defined for functions of this script
+     * @throws IOException
+     */
+    public void registerCode(String path, String scriptingLang, String 
namespace)
+    throws IOException {
+        File f = new File(path);
+
+        if (!f.canRead()) {
+            int errCode = 4002;
+            String msg = "Can't read file: " + path;
+            throw new FrontendException(msg, errCode,
+                    PigException.USER_ENVIRONMENT);
+        }
+        if(scriptingLang != null) {
+            ScriptEngine se = ScriptEngine.getInstance(scriptingLang);
+            se.registerFunctions(path, namespace, pigContext);
+        }
+        pigContext.addScriptFile(path);
+    }
+    
+    /**
      * Register a query with the Pig runtime. The query is parsed and 
registered, but it is not
      * executed until it is needed.
      * 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Mon Jul 26 16:54:45 2010
@@ -221,7 +221,9 @@ public class POUserFunc extends Expressi
                     }
                 }
                 if(resultType == DataType.BYTEARRAY) {
-                    if(res.result != null && DataType.findType(result.result) 
!= DataType.BYTEARRAY) {
+                    // This is needed if some EvalFunc has default datatype as 
bytearray and returns arbitrary objects
+                    // We see such behavior in case of script EvalFunc, which 
is used to run udfs in scripting langs
+                    if(result.result != null && 
DataType.findType(result.result) != DataType.BYTEARRAY) {
                         result.result = new 
DataByteArray(result.result.toString().getBytes());
                     }
                 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Jul 26 
16:54:45 2010
@@ -90,6 +90,12 @@ public class PigContext implements Seria
    
     private Properties properties;
     
+    //  script files that are needed to run a job
+    public List<String> scriptFiles = new ArrayList<String>();
+    
+    //  script jars that are needed to run a script - jython.jar etc
+    public List<String> scriptJars = new ArrayList<String>(2);
+    
     /**
      * a table mapping function names to function specs.
      */
@@ -205,6 +211,12 @@ public class PigContext implements Seria
         }
     }
     
+    public void addScriptFile(String path) throws MalformedURLException {
+        if (path != null) {
+            scriptFiles.add(path);
+        }
+    }
+    
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();
@@ -381,7 +393,6 @@ public class PigContext implements Seria
     
     
     
-    
 
     /**
      * Creates a Classloader based on the passed jarFile and any extra jar 
files.

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon Jul 26 16:54:45 2010
@@ -3256,7 +3256,6 @@ ExpressionOperator EvalFuncSpec(Schema o
 {
        String funcName = null; 
        FuncSpec funcSpec = null; 
-       String funcNameAlias = null; 
     boolean registeredFunction = false;
        List<ExpressionOperator> args;
        ExpressionOperator userFunc;
@@ -3265,30 +3264,28 @@ ExpressionOperator EvalFuncSpec(Schema o
        log.trace("Entering EvalFuncSpec");
 }
 {
-       (
-    (
-    LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) 
funcNameAlias=QualifiedFunction()
-    {
-               func = pigContext.instantiateFuncFromAlias(funcNameAlias);
-               try{
-            FunctionType.tryCasting(func, funcType);
-               } catch (Exception e){
+       // get the func object
+       funcName = QualifiedFunction()
+       {
+               // look into defined functions map and try casting
+               // This supports namespace.func, jarname.package.func scenarios
+           try{
+               func = pigContext.instantiateFuncFromAlias(funcName);
+               FunctionType.tryCasting(func, funcType);
+               }catch (Exception e){
                        ParseException pe = new ParseException(e.getMessage());
-                       pe.initCause(e); 
+                       pe.initCause(e);
                        throw pe;
                }
-    }
-    )
-|   func=EvalFunction(funcType)
-    )
-    "(" args=EvalArgs(over,specs,lp,input) ")" 
+       }
+       "(" args=EvalArgs(over,specs,lp,input) ")" 
        {
                if(null != func) {
-            funcName = func.getClass().getName();
-            if(null != funcNameAlias) {
-                funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
+            if(null != pigContext.getFuncSpecFromAlias(funcName)) {
+                funcSpec = pigContext.getFuncSpecFromAlias(funcName);
             } else {
-                funcSpec = new FuncSpec(funcName);
+               funcName = func.getClass().getName();
+               funcSpec = new FuncSpec(funcName);
             }
             byte type = DataType.BYTEARRAY;
             switch(funcType) {
@@ -3303,7 +3300,7 @@ ExpressionOperator EvalFuncSpec(Schema o
             }
                        userFunc = new LOUserFunc(lp, new OperatorKey(scope, 
getNextId()), funcSpec, type);
         } else {
-            throw new ParseException("Could not instantiate function: " + 
funcNameAlias);
+            throw new ParseException("Could not instantiate function: " + 
funcName);
         }
                lp.add(userFunc);
                log.debug("EvalFuncSpec: Added operator " + 
userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
@@ -3729,29 +3726,6 @@ Schema TypeTupleSchema() : 
 
 // These the simple non-terminals that are shared across many
 
-Object  EvalFunction(byte funcType) : 
-{
-       String funcName;
-    Object func = null;
-       log.trace("Entering EvalFunction");
-}
-{
-       funcName = QualifiedFunction()
-       {
-        func = pigContext.instantiateFuncFromAlias(funcName);
-               try{
-            FunctionType.tryCasting(func, funcType);
-               }catch (Exception e){
-                       ParseException pe = new ParseException(e.getMessage());
-                       pe.initCause(e);
-                       throw pe;
-               }
-               log.trace("Exiting EvalFunction");
-               
-               return func;
-       }
-}
-
 String EvalClass(byte classType) :
 {
        String className;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Jul 26 
16:54:45 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.util;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,7 +31,6 @@ import java.net.URLDecoder;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
@@ -39,11 +39,10 @@ import java.util.jar.JarInputStream;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-//import 
org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.impl.PigContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.impl.PigContext;
 
 
 public class JarManager {
@@ -117,11 +116,18 @@ public class JarManager {
             // log.error("Adding " + jarEntry.jar + ":" + jarEntry.prefix);
             mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
         }
-        for (int i = 0; i < pigContext.extraJars.size(); i++) {
+        for (String scriptJar: pigContext.scriptJars) {
+            mergeJar(jarFile, scriptJar, null, contents);
+        }
+        for (URL extraJar: pigContext.extraJars) {
             // log.error("Adding extra " + pigContext.extraJars.get(i));
-            mergeJar(jarFile, pigContext.extraJars.get(i), null, contents);
+            mergeJar(jarFile, extraJar, null, contents);
         }
-
+        for (int i = 0; i < pigContext.scriptFiles.size(); i++) {
+               String path = pigContext.scriptFiles.get(i);
+               addStream(jarFile, path, new FileInputStream(new 
File(path)),contents);
+        }
+        
         jarFile.putNextEntry(new ZipEntry("pigContext"));
         new ObjectOutputStream(jarFile).writeObject(pigContext);
         jarFile.close();

Added: hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java?rev=979361&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/scripting/ScriptEngine.java Mon Jul 26 
16:54:45 2010
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Base class for various scripting implementations
+ */
+abstract public class ScriptEngine {
+    /**
+     * Pig supported scripting languages with their keywords
+     */
+    private static final Map<String, String> supportedScriptLangs = new 
HashMap<String, String>();
+    static {
+        // Python
+        supportedScriptLangs.put("jython", 
"org.apache.pig.scripting.jython.JythonScriptEngine");
+        // Ruby
+        //supportedScriptLangs.put("jruby", 
"org.apache.pig.scripting.jruby.JrubyScriptEngine");
+    }
+    public static final String namespaceSeparator = ".";
+    
+    /**
+     * registers the Jython functions as Pig functions with given namespace
+     * 
+     * @param path path of the script
+     * @param namespace namespace for the functions
+     * @param pigContext pigcontext to register functions to pig in the given 
namespace
+     * @throws IOException
+     */
+    public abstract void registerFunctions(String path, String namespace, 
PigContext pigContext) throws IOException;
+
+    /** 
+     * figure out the jar location from the class 
+     * @param clazz
+     * @return the jar file location, null if the class was not loaded from a 
jar
+     * @throws FileNotFoundException 
+     */
+    protected static String getJarPath(Class<?> clazz) throws 
FileNotFoundException {
+        URL resource = 
clazz.getClassLoader().getResource(clazz.getCanonicalName().replace(".","/")+".class");
+        if (resource.getProtocol().equals("jar")) {
+            return 
resource.getPath().substring(resource.getPath().indexOf(':')+1,resource.getPath().indexOf('!'));
+        }
+        throw new FileNotFoundException("Jar for "+ clazz.getName() +" class 
is not found");
+    }
+
+    /**
+     * get instance of the scriptEngine for the given scriptingLang
+     * 
+     * @param scriptingLang ScriptEngine classname or keyword for the 
scriptingLang
+     * @return scriptengine for the given scripting language
+     * @throws IOException
+     */
+    public static ScriptEngine getInstance(String scriptingLang)
+    throws IOException {
+        String scriptingEngine = scriptingLang;
+        try {
+            if(supportedScriptLangs.containsKey(scriptingLang)) {
+                scriptingEngine = supportedScriptLangs.get(scriptingLang);
+            }
+            return (ScriptEngine) Class.forName(scriptingEngine).newInstance();
+        } catch (Exception e) {
+            throw new IOException("Could not load ScriptEngine: "
+                    + scriptingEngine + ": " + e);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=979361&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java 
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonFunction.java 
Mon Jul 26 16:54:45 2010
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jython;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.python.core.Py;
+import org.python.core.PyBaseCode;
+import org.python.core.PyException;
+import org.python.core.PyFunction;
+import org.python.core.PyObject;
+
+/**
+ * Python implementation of a Pig UDF Performs mappings between Python & Pig
+ * data structures
+ */
+public class JythonFunction extends EvalFunc<Object> {
+    private PyFunction function;
+    private Schema schema;
+    private int num_parameters;
+    private String scriptFilePath;
+    private String outputSchemaFunc;
+    
+    public JythonFunction(String filename, String functionName) throws 
IOException{
+        PyFunction f;
+        boolean found = false;
+
+        try {
+            f = JythonScriptEngine.getFunction(filename, functionName);
+            this.function = f;
+            num_parameters = ((PyBaseCode) f.func_code).co_argcount;
+            PyObject outputSchemaDef = f.__findattr__("outputSchema".intern());
+            if (outputSchemaDef != null) {
+                this.schema = 
Utils.getSchemaFromString(outputSchemaDef.toString());
+                found = true;
+            }
+            PyObject outputSchemaFunctionDef = 
f.__findattr__("outputSchemaFunction".intern());
+            if (outputSchemaFunctionDef != null) {
+                if(found) {
+                    throw new ExecException(
+                            "multiple decorators for " + functionName);
+                }
+                scriptFilePath = filename;
+                outputSchemaFunc = outputSchemaFunctionDef.toString();
+                this.schema = null;
+                found = true;
+            }
+            PyObject schemaFunctionDef = 
f.__findattr__("schemaFunction".intern());
+            if (schemaFunctionDef != null) {
+                if(found) {
+                    throw new ExecException(
+                            "multiple decorators for " + functionName);
+                }
+                // We should not see these functions here
+                // BUG
+                throw new ExecException(
+                        "unregistered " + functionName);
+            }
+        } catch (ParseException pe) {
+            throw new ExecException("Could not parse schema for script 
function " + pe);
+        } catch (IOException e) {
+            throw new IllegalStateException("Could not initialize: " + 
filename);
+        } catch (Exception e) {
+            throw new ExecException("Could not initialize: " + filename);
+        }
+    }
+
+    @Override
+    public Object exec(Tuple tuple) throws IOException {
+        try {
+            if (tuple == null || num_parameters == 0) {
+                // ignore input tuple
+                PyObject out = function.__call__();
+                return JythonUtils.pythonToPig(out);
+            }
+            else {
+                // this way we get the elements of the tuple as parameters 
instead
+                // of one tuple object
+                PyObject[] params = 
JythonUtils.pigTupleToPyTuple(tuple).getArray();
+                return JythonUtils.pythonToPig(function.__call__(params));
+            }
+        } catch (PyException e) {
+            throw new ExecException("Error executing function: " + e);
+        } catch (Exception e) {
+            throw new IOException("Error executing function: " + e);
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        if(schema != null) {
+            return schema;
+        } else {
+            if(outputSchemaFunc != null) {
+                PyFunction pf;
+                try {
+                    pf = JythonScriptEngine.getFunction(scriptFilePath, 
outputSchemaFunc);
+                    // this should be a schema function
+                    PyObject schemaFunctionDef = 
pf.__findattr__("schemaFunction".intern());
+                    if(schemaFunctionDef == null) {
+                        throw new IllegalStateException("Function: "
+                                + outputSchemaFunc + " is not a schema 
function");
+                    }
+                    return 
(Schema)((pf.__call__(Py.java2py(input))).__tojava__(Object.class));
+                } catch (IOException ioe) {
+                    throw new IllegalStateException("Could not find function: "
+                        + outputSchemaFunc + "()");
+                }
+            } else {
+                return new Schema(new Schema.FieldSchema(null, 
DataType.BYTEARRAY));
+            }
+        }
+    }
+}
+

Added: 
hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java?rev=979361&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java 
(added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonScriptEngine.java 
Mon Jul 26 16:54:45 2010
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jython;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.scripting.ScriptEngine;
+import org.python.core.PyFunction;
+import org.python.core.PyObject;
+import org.python.core.PyStringMap;
+import org.python.core.PyTuple;
+import org.python.util.PythonInterpreter;
+
+/**
+ * Implementation of the script engine for Jython
+ */
+public class JythonScriptEngine extends ScriptEngine {
+    
+    // Decorators -
+    // "schemaFunction"
+    // "outputSchema"
+    // "outputSchemaFunction"
+    
+    /**
+     * Language Interpreter Uses static holder pattern
+     */
+    private static class Interpreter {
+        static final PythonInterpreter interpreter = new PythonInterpreter();
+        static volatile ArrayList<String> filesLoaded = new 
ArrayList<String>();
+        
+        static synchronized void init(String path) throws IOException {
+            if (!filesLoaded.contains(path)) {
+                // attempt addition of schema decorator handler, fail silently
+                interpreter.exec("def outputSchema(schema_def):\n"
+                        + "    def decorator(func):\n"
+                        + "        func.outputSchema = schema_def\n"
+                        + "        return func\n" 
+                        + "    return decorator\n\n");
+
+                interpreter.exec("def outputSchemaFunction(schema_def):\n"
+                        + "    def decorator(func):\n"
+                        + "        func.outputSchemaFunction = schema_def\n"
+                        + "        return func\n"
+                        + "    return decorator\n");
+                
+                interpreter.exec("def schemaFunction(schema_def):\n"
+                        + "     def decorator(func):\n"
+                        + "         func.schemaFunction = schema_def\n"    
+                        + "         return func\n"
+                        + "     return decorator\n\n");
+
+                InputStream is = null;
+                File file = new File(path);
+                if (file.exists()) {
+                    is = new FileInputStream(file);
+                }
+                else {
+                    is = Interpreter.class.getResourceAsStream("/" + path);
+                }
+                if (is != null) {
+                    interpreter.execfile(is);
+                    filesLoaded.add(path);
+                    is.close();
+                } else {
+                    throw new IOException(
+                    "Could not initialize interpreter with "+ path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void registerFunctions(String path, String namespace, PigContext 
pigContext)
+    throws IOException{
+        Interpreter.init(path);
+        pigContext.scriptJars.add(getJarPath(PythonInterpreter.class));
+        PythonInterpreter pi = Interpreter.interpreter;
+        @SuppressWarnings("unchecked")
+        List<PyTuple> locals = (List<PyTuple>) ((PyStringMap) 
pi.getLocals()).items();
+        if(namespace == null) {
+            namespace = "";
+        }
+        else {
+            namespace = namespace + namespaceSeparator;
+        }
+        try {
+            for (PyTuple item : locals) {
+                String key = (String) item.get(0);
+                Object value = item.get(1);
+                FuncSpec funcspec = null;
+                if (!key.startsWith("__") && !key.equals("schemaFunction")
+                        && !key.equals("outputSchema")
+                        && !key.equals("outputSchemaFunction")
+                        && (value instanceof PyFunction)
+                        && 
(((PyFunction)value).__findattr__("schemaFunction".intern())== null)) {
+                    PyObject obj = 
((PyFunction)value).__findattr__("outputSchema".intern());
+                    if(obj != null) {
+                        Utils.getSchemaFromString(obj.toString());
+                    }
+                    funcspec = new 
FuncSpec(JythonFunction.class.getCanonicalName() + "('"
+                            + path + "','" + key +"')");
+                    pigContext.registerFunction(namespace + key, funcspec);
+                }
+            }
+        } catch (ParseException pe) {
+            throw new IOException("Error parsing schema for script function 
from the decorator " + pe);
+        }
+    }
+
+    public static PyFunction getFunction(String path, String functionName) 
throws IOException {
+        Interpreter.init(path);
+        return (PyFunction) Interpreter.interpreter.get(functionName);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java?rev=979361&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java 
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/scripting/jython/JythonUtils.java Mon 
Jul 26 16:54:45 2010
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.scripting.jython;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.python.core.Py;
+import org.python.core.PyDictionary;
+import org.python.core.PyFloat;
+import org.python.core.PyInteger;
+import org.python.core.PyList;
+import org.python.core.PyLong;
+import org.python.core.PyNone;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyTuple;
+
+public class JythonUtils {
+
+    private static TupleFactory tupleFactory = TupleFactory.getInstance();
+    private static BagFactory bagFactory = DefaultBagFactory.getInstance();
+
+    @SuppressWarnings("unchecked")
+    public static Object pythonToPig(PyObject pyObject) throws ExecException {
+        try {
+            Object javaObj = null;
+            // Add code for all supported pig types here
+            // Tuple, bag, map, int, long, float, double, chararray, bytearray 
+            if (pyObject instanceof PyTuple) {
+                PyTuple pyTuple = (PyTuple) pyObject;
+                Object[] tuple = new Object[pyTuple.size()];
+                int i = 0;
+                for (PyObject tupleObject : pyTuple.getArray()) {
+                    tuple[i++] = pythonToPig(tupleObject);
+                }
+                javaObj = tupleFactory.newTuple(Arrays.asList(tuple));
+            } else if (pyObject instanceof PyList) {
+                DataBag list = bagFactory.newDefaultBag();
+                for (PyObject bagTuple : ((PyList) pyObject).asIterable()) {
+                    // In jython, list need not be a bag of tuples, as it is 
in case of pig
+                    // So we fail with cast exception if we dont find tuples 
inside bag
+                    // This is consistent with java udf (bag should be filled 
with tuples)
+                    list.add((Tuple) pythonToPig(bagTuple));
+                }
+                javaObj = list;
+            } else if (pyObject instanceof PyDictionary) {
+                Map<?, PyObject> map = Py.tojava(pyObject, Map.class);
+                Map<Object, Object> newMap = new HashMap<Object, Object>();
+                for (Map.Entry<?, PyObject> entry : map.entrySet()) {
+                    newMap.put(entry.getKey(), pythonToPig(entry.getValue()));
+                }
+                javaObj = newMap;
+            } else if (pyObject instanceof PyLong) {
+                javaObj = pyObject.__tojava__(Long.class);
+            } else if (pyObject instanceof PyInteger) {
+                javaObj = pyObject.__tojava__(Integer.class);
+            } else if (pyObject instanceof PyFloat) {
+                // J(P)ython is loosely typed, supports only float type, 
+                // hence we convert everything to double to save precision
+                javaObj = pyObject.__tojava__(Double.class);
+            } else if (pyObject instanceof PyString) {
+                javaObj = pyObject.__tojava__(String.class);
+            } else if (pyObject instanceof PyNone) {
+                return null;
+            } else {
+                javaObj = pyObject.__tojava__(byte[].class);
+                // if we successfully converted to byte[]
+                if(javaObj instanceof byte[]) {
+                    javaObj = new DataByteArray((byte[])javaObj);
+                }
+                else {
+                    throw new ExecException("Non supported pig datatype found, 
cast failed");
+                }
+            }
+            if(javaObj.equals(Py.NoConversion)) {
+                throw new ExecException("Cannot cast into any pig supported 
type");
+            }
+            return javaObj;
+        } catch (Exception e) {
+            throw new ExecException("Cannot convert jython type to pig 
datatype "+ e);
+        }
+    }
+
+    public static PyObject pigToPython(Object object) {
+        if (object instanceof Tuple) {
+            return pigTupleToPyTuple((Tuple) object);
+        } else if (object instanceof DataBag) {
+            PyList list = new PyList();
+            for (Tuple bagTuple : (DataBag) object) {
+                list.add(pigTupleToPyTuple(bagTuple));
+            }
+            return list;
+        } else if (object instanceof Map<?, ?>) {
+            PyDictionary newMap = new PyDictionary();
+            for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) {
+                newMap.put(entry.getKey(), pigToPython(entry.getValue()));
+            }
+            return newMap;
+        } else if (object instanceof DataByteArray) {
+            return Py.java2py(((DataByteArray) object).get());
+        } else {
+            return Py.java2py(object);
+        }
+    }
+
+    public static PyTuple pigTupleToPyTuple(Tuple tuple) {
+        PyObject[] pyTuple = new PyObject[tuple.size()];
+        int i = 0;
+        for (Object object : tuple.getAll()) {
+            pyTuple[i++] = pigToPython(object);
+        }
+        return new PyTuple(pyTuple);
+    }
+
+}
+

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Jul 26 
16:54:45 2010
@@ -365,6 +365,19 @@ public class GruntParser extends PigScri
     protected void processRegister(String jar) throws IOException {
         mPigServer.registerJar(jar);
     }
+    
+    @Override
+    protected void processRegister(String path, String scriptingLang, String 
namespace) throws IOException, ParseException {
+        if(path.endsWith(".jar")) {
+            if(scriptingLang != null || namespace != null) {
+                throw new ParseException("Cannot register a jar with a 
scripting language or namespace");
+            }
+            mPigServer.registerJar(path);
+        }
+        else {
+            mPigServer.registerCode(path, scriptingLang, namespace);
+        }
+    }    
 
     private String runPreprocessor(String script, List<String> params, 
                                    List<String> files) 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj 
Mon Jul 26 16:54:45 2010
@@ -74,6 +74,8 @@ public abstract class PigScriptParser
        abstract protected void processExplain(String alias, String script, 
boolean isVerbose, String format, String target, List<String> params, 
List<String> files) throws IOException, ParseException;
        
        abstract protected void processRegister(String jar) throws IOException;
+       
+       abstract protected void processRegister(String path, String 
scriptingEngine, String namespace) throws IOException, ParseException;
 
        abstract protected void processSet(String key, String value) throws 
IOException, ParseException;
                
@@ -152,6 +154,8 @@ TOKEN: {<MKDIR: "mkdir">}
 TOKEN: {<PWD: "pwd">}
 TOKEN: {<QUIT: "quit">}
 TOKEN: {<REGISTER: "register">}
+TOKEN: {<USING: "using">}
+TOKEN: {<AS: "as"> }
 TOKEN: {<REMOVE: "rm">}
 TOKEN: {<REMOVEFORCE: "rmf">}
 TOKEN: {<SET: "set">}
@@ -372,8 +376,8 @@ TOKEN : { <QUOTEDSTRING :  "'"
       "'"> }
 void parse() throws IOException:
 {
-       Token t1, t2;
-       String val = null;
+       Token t1, t2, t3;
+       String engine = null, namespace = null;
        List<String> cmdTokens = new ArrayList<String>();
 }
 
@@ -501,7 +505,15 @@ void parse() throws IOException:
        |
        <REGISTER>
        t1 = GetPath()
-       {processRegister(unquote(t1.image));}
+       [
+       <USING> t2 = GetPath()
+       { engine = t2.image; }
+               [
+                       <AS> t3 = <IDENTIFIER>
+                       {namespace = t3.image; }
+               ]
+       ]       
+       {processRegister(unquote(t1.image), engine, namespace); }
        |
        Script()
        |
@@ -701,6 +713,10 @@ Token GetReserved () :
        |
        t = <REGISTER>
        |
+       t = <USING>
+       |
+       t = <AS>
+       |
        t = <REMOVE>
        |
        t = <SET>

Modified: hadoop/pig/trunk/test/findbugsExcludeFile.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/findbugsExcludeFile.xml?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/trunk/test/findbugsExcludeFile.xml Mon Jul 26 16:54:45 2010
@@ -407,4 +407,9 @@
         <Field name = "rNums" />
         <Bug pattern="MS_OOI_PKGPROTECT" />
     </Match>
+    <Match>
+        <Class 
name="org.apache.pig.scripting.jython.JythonScriptEngine$Interpreter" />
+        <Method name = "init" />
+        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+    </Match>
 </FindBugsFilter>

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=979361&r1=979360&r2=979361&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Mon Jul 26 
16:54:45 2010
@@ -942,6 +942,32 @@ public class TestGrunt extends TestCase 
         
assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
     }
     
+    @Test
+    public void testRegisterScripts() throws Throwable {
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchema(\"x:{t:(num:long)}\")",
+                "def square(number):" ,
+                "\treturn (number * number)"
+        };
+        
+        Util.createLocalInputFile( "testRegisterScripts.py", script);
+        
+        PigServer server = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        PigContext context = server.getPigContext();
+
+        String strCmd = "register testRegisterScripts.py using jython as 
pig\n";
+
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        grunt.exec();
+        assertTrue(context.getFuncSpecFromAlias("pig.square") != null);
+
+    }
+    /*
     @Test    
     public void testScriptMissingLastNewLine() throws Throwable {   
         PigServer server = new PigServer(ExecType.LOCAL);
@@ -1077,5 +1103,5 @@ public class TestGrunt extends TestCase 
         new Grunt(new BufferedReader(reader), pc).exec();
         
         assertEquals("my.arbitrary.value",  
pc.getExecutionEngine().getConfiguration().getProperty("my.arbitrary.key"));
-    }
+    }*/
 }

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java?rev=979361&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestScriptUDF.java Mon Jul 26 
16:54:45 2010
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScriptUDF extends TestCase {
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+    
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        FileLocalizer.setR(new Random());
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+    
+    // See PIG-928
+    @Test
+    public void testPythonStandardScript() throws Exception{
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchema(\"x:{t:(num:long)}\")",
+                "def square(number):" ,
+                "\treturn (number * number)"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+
+        Util.createInputFile(cluster, "table_testPythonStandardScript", input);
+        Util.createLocalInputFile( "testPythonStandardScript.py", script);
+
+        // Test the namespace
+        pigServer.registerCode("testPythonStandardScript.py", "jython", "pig");
+        pigServer.registerQuery("A = LOAD 'table_testPythonStandardScript' as 
(a0:long, a1:long);");
+        pigServer.registerQuery("B = foreach A generate pig.square(a0);");
+
+        pigServer.registerCode("testPythonStandardScript.py", "jython", null);
+        pigServer.registerQuery("C = foreach A generate square(a0);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.toString().equals("(1)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(4)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(9)"));
+
+        iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(1)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(4)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(9)"));
+    }
+
+    // See PIG-928
+    @Test
+    public void testPythonScriptWithSchemaFunction() throws Exception{
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchemaFunction(\"squareSchema\")",
+                "def square(number):" ,
+                "\treturn (number * number)\n",
+                "@schemaFunction(\"square\")",
+                "def squareSchema(input):",
+                "\treturn input "
+        };
+        String[] input = {
+                "1\t3.0",
+                "2\t4.0",
+                "3\t5.0"
+        };
+
+        Util.createInputFile(cluster, 
"table_testPythonScriptWithSchemaFunction", input);
+        Util.createLocalInputFile( "testPythonScriptWithSchemaFunction.py", 
script);
+
+        // Test the namespace
+        pigServer.registerCode("testPythonScriptWithSchemaFunction.py", 
"jython", "pig");
+        pigServer.registerQuery("A = LOAD 
'table_testPythonScriptWithSchemaFunction' as (a0:int, a1:double);");
+        pigServer.registerQuery("B = foreach A generate pig.square(a0);");
+
+        pigServer.registerCode("testPythonScriptWithSchemaFunction.py", 
"jython", null);
+        pigServer.registerQuery("C = foreach A generate square(a1);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.toString().equals("(1)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(4)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(9)"));
+
+        // The same python function will operate on double and try to get 
square of double
+        // Since these are small double numbers we do not need to use delta to 
test the results
+        iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(9.0)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(16.0)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.toString().equals("(25.0)"));
+    }
+
+    // See PIG-928
+    @Test
+    public void testPythonScriptUDFNoDecorator() throws Exception{
+        String[] script = {
+                "#!/usr/bin/python",
+                // No decorator means schema is null - bytearray...
+                "def concat(word):" ,
+                "\treturn word + word"
+        };
+        String[] input = {
+                "hello\t1",
+                "pig\t2",
+                "world\t3"
+        };
+
+        Util.createInputFile(cluster, "table_testPythonScriptUDFNoDecorator", 
input);
+        Util.createLocalInputFile( "testPythonScriptUDFNoDecorator.py", 
script);
+
+        pigServer.registerCode("testPythonScriptUDFNoDecorator.py", "jython", 
"pig");
+        pigServer.registerQuery("A = LOAD 
'table_testPythonScriptUDFNoDecorator' as (a0, a1:int);");
+        pigServer.registerQuery("B = foreach A generate pig.concat(a0);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        // We need to check whether this is a DataByteArray or fail otherwise
+        if(!(t.get(0) instanceof DataByteArray)) {
+            fail("Default return type should be bytearray");
+        }
+
+        assertTrue(t.get(0).toString().trim().equals("hellohello"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.get(0).toString().trim().equals("pigpig"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.get(0).toString().trim().equals("worldworld"));
+    }
+    
+    @Test
+    public void testPythonScriptUDFBagInput() throws Exception{
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")",
+                "def collect(bag):" ,
+                "\toutBag = []",
+                "\tfor word in bag:",
+                // We need to wrap word inside a tuple for pig
+                "\t\ttup=(len(bag), word[1])",
+                "\t\toutBag.append(tup)",
+                "\treturn outBag"
+        };
+        String[] input = {
+                "1\thello",
+                "2\tpig",
+                "1\tworld",
+                "1\tprogram",
+                "2\thadoop"
+        };
+
+        Util.createInputFile(cluster, "table_testPythonScriptUDFBagInput", 
input);
+        Util.createLocalInputFile( "testPythonScriptUDFBagInput.py", script);
+
+        pigServer.registerCode("testPythonScriptUDFBagInput.py", "jython", 
"pig");
+        pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFBagInput' 
as (a0:int, a1:chararray);");
+        pigServer.registerQuery("B = group A by a0;");
+        pigServer.registerQuery("C = foreach B generate pig.collect(A);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        DataBag bag; 
+        Tuple tup;
+        bag = BagFactory.getInstance().newDefaultBag();
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(3);
+        tup.append("hello");
+        bag.add(tup);
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(3);
+        tup.append("world");
+        bag.add(tup);
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(3);
+        tup.append("program");
+        bag.add(tup);
+    
+        assertTrue(t.get(0).toString().equals(bag.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        bag = BagFactory.getInstance().newDefaultBag();
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(2);
+        tup.append("pig");
+        bag.add(tup);
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(2);
+        tup.append("hadoop");
+        bag.add(tup);
+        
+        assertTrue(t.get(0).toString().equals(bag.toString()));
+    }
+    
+    @Test
+    public void testPythonScriptUDFMapInput() throws Exception{
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")",
+                "def maptobag(map):" ,
+                "\toutBag = []",
+                "\tfor k, v in map.iteritems():",
+                // We need to wrap word inside a tuple for pig
+                "\t\ttup = (k, v)",
+                "\t\toutBag.append(tup)",
+                "\treturn outBag"
+        };
+        String[] input = {
+                "[1#hello,2#world]",
+                "[3#pig,4#rocks]",
+        };
+
+        Util.createInputFile(cluster, "table_testPythonScriptUDFMapInput", 
input);
+        Util.createLocalInputFile( "testPythonScriptUDFMapInput.py", script);
+
+        pigServer.registerCode("testPythonScriptUDFMapInput.py", "jython", 
"pig");
+        pigServer.registerQuery("A = LOAD 'table_testPythonScriptUDFMapInput' 
as (a0:map[]);");
+        pigServer.registerQuery("B = foreach A generate pig.maptobag(a0);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        DataBag bag; 
+        Tuple tup;
+        bag = BagFactory.getInstance().newDefaultBag();
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(1);
+        tup.append("hello");
+        bag.add(tup);
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(2);
+        tup.append("world");
+        bag.add(tup);
+        assertTrue(t.get(0).toString().equals(bag.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        bag = BagFactory.getInstance().newDefaultBag();
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(3);
+        tup.append("pig");
+        bag.add(tup);
+        tup = TupleFactory.getInstance().newTuple();
+        tup.append(4);
+        tup.append("rocks");
+        bag.add(tup);
+        assertTrue(t.get(0).toString().equals(bag.toString()));
+        
+        assertFalse(iter.hasNext());
+        
+    }
+    
+    @Test
+    public void testPythonScriptUDFNullInputOutput() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "@outputSchema(\"bag:{(y:{t:(word:chararray)}}\")",
+                "def multStr(cnt, str):" ,
+                "\tif cnt != None:",
+                "\t\treturn cnt * str",
+                "\telse:",
+                "\t\treturn None"
+        };
+        String[] input = {
+                "3\thello",
+                // Null input
+                "\tworld",
+        };
+        
+        Util.createInputFile(cluster, 
"table_testPythonScriptUDFNullInputOutput", input);
+        Util.createLocalInputFile( "testPythonScriptUDFNullInputOutput.py", 
script);
+
+        pigServer.registerCode("testPythonScriptUDFNullInputOutput.py", 
"jython", "pig");
+        pigServer.registerQuery("A = LOAD 
'table_testPythonScriptUDFNullInputOutput' as (a0:int, a1:chararray);");
+        pigServer.registerQuery("B = foreach A generate pig.multStr(a0, a1);");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.get(0).toString().equals("hellohellohello"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        // UDF takes null and returns null
+        assertTrue(t.get(0) == null);
+        
+    }
+}
+


Reply via email to