Author: olga
Date: Mon Sep 29 14:46:05 2008
New Revision: 700270

URL: http://svn.apache.org/viewvc?rev=700270&view=rev
Log:
PIG-427: casting input to UDF

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java
Modified:
    incubator/pig/branches/types/CHANGES.txt
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Mon Sep 29 14:46:05 2008
@@ -259,3 +259,5 @@
     PIG-376: set job name (olgan)
 
     PIG-463: POCast changes (pradeepk via olgan)
+
+    PIG-427: casting input to UDFs

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
 Mon Sep 29 14:46:05 2008
@@ -126,4 +126,8 @@
         mFuncSpec = funcSpec;
     }
 
+    public void setMArgs(List<ExpressionOperator> args) {
+        mArgs = args;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 Mon Sep 29 14:46:05 2008
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +41,6 @@
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.*;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.data.DataType ;
@@ -56,11 +56,46 @@
  */
 public class TypeCheckingVisitor extends LOVisitor {
 
+    private static final int INF = -1;
+
     private static final Log log = 
LogFactory.getLog(TypeCheckingVisitor.class);
 
     private CompilationMessageCollector msgCollector = null ;
 
     private boolean strictMode = false ;
+    
+    public static MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
+    static{
+        //Ordering here decides the score for the best fit function.
+        //Do not change the order. Conversions to a smaller type is preferred
+        //over conversion to a bigger type where ordering of types is:
+        //INTEGER, LONG, FLOAT, DOUBLE, CHARARRAY, TUPLE, BAG, MAP
+        //from small to big
+//        castLookup.put(DataType.BOOLEAN, DataType.INTEGER);
+//        castLookup.put(DataType.BOOLEAN, DataType.LONG);
+//        castLookup.put(DataType.BOOLEAN, DataType.FLOAT);
+//        castLookup.put(DataType.BOOLEAN, DataType.DOUBLE);
+//        castLookup.put(DataType.BOOLEAN, DataType.CHARARRAY);
+        castLookup.put(DataType.INTEGER, DataType.LONG);
+        castLookup.put(DataType.INTEGER, DataType.FLOAT);
+        castLookup.put(DataType.INTEGER, DataType.DOUBLE);
+//        castLookup.put(DataType.INTEGER, DataType.CHARARRAY);
+        castLookup.put(DataType.LONG, DataType.FLOAT);
+        castLookup.put(DataType.LONG, DataType.DOUBLE);
+//        castLookup.put(DataType.LONG, DataType.CHARARRAY);
+        castLookup.put(DataType.FLOAT, DataType.DOUBLE);
+//        castLookup.put(DataType.FLOAT, DataType.CHARARRAY);
+//        castLookup.put(DataType.DOUBLE, DataType.CHARARRAY);
+//        castLookup.put(DataType.BYTEARRAY, DataType.BOOLEAN);
+        castLookup.put(DataType.BYTEARRAY, DataType.INTEGER);
+        castLookup.put(DataType.BYTEARRAY, DataType.LONG);
+        castLookup.put(DataType.BYTEARRAY, DataType.FLOAT);
+        castLookup.put(DataType.BYTEARRAY, DataType.DOUBLE);
+        castLookup.put(DataType.BYTEARRAY, DataType.CHARARRAY);
+        castLookup.put(DataType.BYTEARRAY, DataType.TUPLE);
+        castLookup.put(DataType.BYTEARRAY, DataType.BAG);
+        castLookup.put(DataType.BYTEARRAY, DataType.MAP);
+    }
 
     public TypeCheckingVisitor(LogicalPlan plan,
                         CompilationMessageCollector messageCollector) {
@@ -1190,28 +1225,40 @@
         } catch (Exception e) {
             throw new VisitorException(e);
         }
-        
-        if(funcSpecs != null) {
-            // check the if a FuncSpec matching our schema exists
-            FuncSpec matchingSpec = null;
-            for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); 
iterator.hasNext();) {
-                FuncSpec fs = iterator.next();
-                if(Schema.equals(s, fs.getInputArgsSchema(), false, true)) {
-                    matchingSpec = fs;
+        FuncSpec matchingSpec = null;
+        if(funcSpecs!=null && funcSpecs.size()!=0){
+            //Some function mappings found. Trying to see
+            //if one of them fits the input schema
+            if((matchingSpec = exactMatch(funcSpecs, s))==null){
+                //Oops, no exact match found. Trying to see if we
+                //have mappings that we can fit using casts.
+                if(byteArrayFound(s) && funcSpecs.size()!=1){
+                    //Oops, we have byte arrays and multiple mappings.
+                    //Throw exception that we can't infer a fit.
+                    String msg = "Could not infer the matching function for " 
+ func.getFuncSpec() + " as multiple of them were found to match " + 
s.toString() + ". Please use an explicit cast." ;
+                    msgCollector.collect(msg, MessageType.Error);
+                    throw new VisitorException(msg) ;
+                }
+                else if((matchingSpec=bestFitMatch(funcSpecs,s))==null){
+                    //Either no byte arrays found or there are byte arrays
+                    //but only one mapping exists.
+                    //However, we could not find a match as there were either
+                    //none fitting the input schema or it was ambiguous.
+                    //Throw exception that we can't infer a fit.
+                    String msg = "Could not infer the matching function for " 
+ func.getFuncSpec() + " as multiple or none of them fit. Please use an 
explicit cast." ;
+                    msgCollector.collect(msg, MessageType.Error);
+                    throw new VisitorException(msg) ;
                 }
-                
-            }
-            if(matchingSpec == null) {
-                StringBuilder sb = new StringBuilder();
-                sb.append(func.getFuncSpec());
-                sb.append(" does not work with inputs of type ");
-                sb.append(s);
-                throw new VisitorException(sb.toString());
-            } else {
-                func.setFuncSpec(matchingSpec);
             }
         }
-
+        if(matchingSpec!=null){
+            //Voila! We have a fitting match. Lets insert casts and make
+            //it work.
+            func.setFuncSpec(matchingSpec);
+            insertCastsForUDF(func, s, matchingSpec.getInputArgsSchema());
+        }
+            
+        //Regenerate schema as there might be new additions
         try {
             func.regenerateFieldSchema();
         } catch (FrontendException fee) {
@@ -1222,6 +1269,159 @@
             throw new VisitorException(msg) ;
         }
     }
+    
+    /**
+     * Tries to find the schema supported by one of funcSpecs which can
+     * be obtained by inserting a set of casts to the input schema
+     * @param funcSpecs - mappings provided by udf
+     * @param s - input schema
+     * @return the funcSpec that supports the schema that is best suited
+     *          to s. The best suited schema is one that has the
+     *          lowest score as returned by fitPossible().
+     */
+    private FuncSpec bestFitMatch(List<FuncSpec> funcSpecs, Schema s) {
+        FuncSpec matchingSpec = null;
+        long score = INF;
+        long prevBestScore = Long.MAX_VALUE;
+        long bestScore = Long.MAX_VALUE;
+        for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); 
iterator.hasNext();) {
+            FuncSpec fs = iterator.next();
+            score = fitPossible(s,fs.getInputArgsSchema());
+            if(score!=INF && score<=bestScore){
+                matchingSpec = fs;
+                prevBestScore = bestScore;
+                bestScore = score;
+            }
+        }
+        if(matchingSpec!=null && bestScore!=prevBestScore)
+            return matchingSpec;
+        
+        return null;
+    }
+    
+    /**
+     * Checks to see if any field of the input schema is a byte array
+     * @param s - input schema
+     * @return true if found else false
+     * @throws VisitorException
+     */
+    private boolean byteArrayFound(Schema s) throws VisitorException {
+        for(int i=0;i<s.size();i++){
+            try {
+                FieldSchema fs=s.getField(i);
+                if(fs.type==DataType.BYTEARRAY){
+                    return true;
+                }
+            } catch (ParseException e) {
+                throw new VisitorException(e);
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * Finds if there is an exact match between the schema supported by
+     * one of the funcSpecs and the input schema s
+     * @param funcSpecs - mappings provided by udf
+     * @param s - input schema
+     * @return the matching spec if found else null
+     */
+    private FuncSpec exactMatch(List<FuncSpec> funcSpecs, Schema s) {
+        FuncSpec matchingSpec = null;
+        for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); 
iterator.hasNext();) {
+            FuncSpec fs = iterator.next();
+            if(Schema.equals(s, fs.getInputArgsSchema(), false, true)) {
+                matchingSpec = fs;
+                break;
+            }
+        }
+        return matchingSpec;
+    }
+    
+    /**
+     * Computes a modified version of manhattan distance between 
+     * the two schemas: s1 & s2. Here the value on the same axis
+     * are preferred over values that change axis as this means
+     * that the number of casts required will be lesser on the same
+     * axis. 
+     * 
+     * However, this function ceases to be a metric as the triangle
+     * inequality does not hold.
+     * 
+     * Each schema is an s1.size() dimensional vector.
+     * The ordering for each axis is as defined by castLookup. 
+     * Unallowed casts are returned a dist of INFINITY.
+     * @param s1
+     * @param s2
+     * @return
+     */
+    private long fitPossible(Schema s1, Schema s2) {
+        if(s1==null || s2==null) return INF;
+        List<FieldSchema> sFields = s1.getFields();
+        List<FieldSchema> fsFields = s2.getFields();
+        if(sFields.size()!=fsFields.size())
+            return INF;
+        long score = 0;
+        int castCnt=0;
+        for(int i=0;i<sFields.size();i++){
+            FieldSchema sFS = sFields.get(i);
+            FieldSchema fsFS = fsFields.get(i);
+            
+            if(DataType.isSchemaType(sFS.type)){
+                if(!FieldSchema.equals(sFS, fsFS, false, true))
+                    return INF;
+            }
+            if(FieldSchema.equals(sFS, fsFS, true, true)) continue;
+            if(!castLookup.containsKey(sFS.type))
+                return INF;
+            if(!(castLookup.get(sFS.type).contains(fsFS.type)))
+                return INF;
+            score += ((List)castLookup.get(sFS.type)).indexOf(fsFS.type) + 1;
+            ++castCnt;
+        }
+        return score * castCnt;
+    }
+    
+    private void insertCastsForUDF(LOUserFunc udf, Schema fromSch, Schema 
toSch) {
+        List<FieldSchema> fsLst = fromSch.getFields();
+        List<FieldSchema> tsLst = toSch.getFields();
+        List<ExpressionOperator> args = udf.getArguments();
+        List<ExpressionOperator> newArgs = new 
ArrayList<ExpressionOperator>(args.size());
+        int i=-1;
+        for (FieldSchema fFSch : fsLst) {
+            ++i;
+            FieldSchema tFSch = tsLst.get(i); 
+            if(fFSch.type==tFSch.type) {
+                newArgs.add(args.get(i));
+                continue;
+            }
+            collectCastWarning(udf,
+                    fFSch.type,
+                    tFSch.type);
+            LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan();
+            /*List<LogicalOperator> list = currentPlan.getPredecessors(udf);
+            if (list == null) {
+                throw new AssertionError("No input for " + udf.getClass());
+            }*/
+            // All uniOps at the moment only work with Expression input
+            ExpressionOperator input = args.get(i);
+            OperatorKey newKey = genNewOperatorKey(udf);
+            LOCast cast = new LOCast(currentPlan, newKey, input, tFSch.type);
+            currentPlan.add(cast);
+            currentPlan.disconnect(input, udf);
+            try {
+                currentPlan.connect(input, cast);
+                currentPlan.connect(cast, udf);
+            } catch (PlanException ioe) {
+                AssertionError err = new AssertionError(
+                        "Explicit casting insertion");
+                err.initCause(ioe);
+                throw err;
+            }
+        }
+        udf.setMArgs(newArgs);
+
+    }
 
     /**
      * For Bincond, lhsOp and rhsOp must have the same output type

Modified: 
incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java?rev=700270&r1=700269&r2=700270&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/EquivalenceClasses.java 
Mon Sep 29 14:46:05 2008
@@ -36,7 +36,6 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.util.IdentityHashSet;
 
-import com.sun.tools.jdi.EventSetImpl.Itr;
 
 //These methods are used to generate equivalence classes given the operator 
name and the output from the operator
 //For example, it gives out 2 eq. classes for filter, one that passes the 
filter and one that doesn't

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java?rev=700270&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBestFitCast.java 
Mon Sep 29 14:46:05 2008
@@ -0,0 +1,176 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
+
+import junit.framework.TestCase;
+
+public class TestBestFitCast extends TestCase {
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    private File tmpFile;
+    
+    public TestBestFitCast() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+//        pigServer = new PigServer(ExecType.LOCAL);
+        int LOOP_SIZE = 20;
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        long l = 0;
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(l + "\t" + i);
+        }
+        ps.close();
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    public static class UDF1 extends EvalFunc<Tuple>{
+        /**
+         * java level API
+         * @param input expects a single numeric DataAtom value
+         * @param output returns a single numeric DataAtom value, cosine value 
of the argument
+         */
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            return input;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+         */
+        @Override
+        public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+            List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+            funcList.add(new FuncSpec(this.getClass().getName(), new 
Schema(Arrays.asList(new Schema.FieldSchema(null, DataType.FLOAT),new 
Schema.FieldSchema(null, DataType.FLOAT)))));
+            funcList.add(new FuncSpec(this.getClass().getName(), new 
Schema(Arrays.asList(new Schema.FieldSchema(null, DataType.LONG),new 
Schema.FieldSchema(null, DataType.DOUBLE)))));
+            funcList.add(new FuncSpec(this.getClass().getName(), new 
Schema(new Schema.FieldSchema(null, DataType.FLOAT))));
+            funcList.add(new FuncSpec(this.getClass().getName(), new 
Schema(new Schema.FieldSchema(null, DataType.INTEGER))));
+            funcList.add(new FuncSpec(this.getClass().getName(), new 
Schema(new Schema.FieldSchema(null, DataType.DOUBLE))));
+            /*funcList.add(new FuncSpec(DoubleMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
+            funcList.add(new FuncSpec(FloatMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
+            funcList.add(new FuncSpec(IntMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
+            funcList.add(new FuncSpec(LongMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
+            funcList.add(new FuncSpec(StringMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));*/
+            return funcList;
+        }    
+
+    }
+    
+    @Test
+    public void test1() throws Exception{
+        //Passing (long, int)
+        //Possible matches: (float, float) , (long, double)
+        //Chooses (long, double) as it has only one cast compared to two for 
(float, float)
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, 
y:int);");
+        pigServer.registerQuery("B = FOREACH A generate x, " + 
UDF1.class.getName() + "(x,y);");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Long);
+            assertEquals(true,((Tuple)t.get(1)).get(1) instanceof Double);
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+    }
+    
+    @Test
+    public void test2() throws Exception{
+        //Passing (int, int)
+        //Possible matches: (float, float) , (long, double)
+        //Throws Exception as ambiguous definitions found
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, 
y:int);");
+        pigServer.registerQuery("B = FOREACH A generate x, " + 
UDF1.class.getName() + "(y,y);");
+        try{
+            pigServer.openIterator("B");
+        }catch (Exception e) {
+            String msg = e.getMessage();
+            assertEquals(true,msg.contains("as multiple or none of them fit"));
+        }
+        
+    }
+    
+    @Test
+    public void test3() throws Exception{
+        //Passing (int, int)
+        //Possible matches: (float, float) , (long, double)
+        //Chooses (float, float) as both options lead to same score and 
(float, float) occurs first.
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, 
y:int);");
+        pigServer.registerQuery("B = FOREACH A generate x, " + 
UDF1.class.getName() + "((float)y,(float)y);");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Float);
+            assertEquals(true,((Tuple)t.get(1)).get(1) instanceof Float);
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+    }
+    
+    @Test
+    public void test4() throws Exception{
+        //Passing (long)
+        //Possible matches: (float), (integer), (double)
+        //Chooses (float) as it leads to a better score that to (double)
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:long, 
y:int);");
+        pigServer.registerQuery("B = FOREACH A generate x, " + 
UDF1.class.getName() + "(x);");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertEquals(true,((Tuple)t.get(1)).get(0) instanceof Float);
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+    }
+    
+    @Test
+    public void test5() throws Exception{
+        //Passing bytearrays
+        //Possible matches: (float, float) , (long, double)
+        //Throws exception since more than one funcSpec and inp is bytearray
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pigServer.registerQuery("B = FOREACH A generate $0, " + 
UDF1.class.getName() + "($1,$1);");
+        try{
+            pigServer.openIterator("B");
+        }catch (Exception e) {
+            String msg = e.getMessage();
+            assertEquals(true,msg.contains("multiple of them were found to 
match"));
+        }
+        
+    }
+}


Reply via email to