Author: thejas
Date: Thu Sep  2 00:01:56 2010
New Revision: 991772

URL: http://svn.apache.org/viewvc?rev=991772&view=rev
Log:
PIG-1572: change default datatype when relations are used as scalar to 
bytearray 

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Sep  2 00:01:56 2010
@@ -193,6 +193,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1572: change default datatype when relations are used as scalar to 
bytearray (thejas)
+
 PIG-1583: piggybank unit test TestLookupInFiles is broken (daijy)
 
 PIG-1563: some of string functions don't work on bytearrays (olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Sep  2 00:01:56 2010
@@ -63,6 +63,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -634,7 +635,13 @@ public class PigServer {
         try {
             LogicalPlan lp = getPlanFromAlias(alias, "describe");
             lp = compileLp(alias, false);
-            Schema schema = lp.getLeaves().get(0).getSchema();
+            Schema schema = null;
+            for(LogicalOperator lo : lp.getLeaves()){
+                if(lo.getAlias().equals(alias)){
+                    schema = lo.getSchema();
+                    break;
+                }
+            }
             if (schema != null) System.out.println(alias + ": " + 
schema.toString());    
             else System.out.println("Schema for " + alias + " unknown.");
             return schema;
@@ -896,6 +903,7 @@ public class PigServer {
             if( 
pigContext.getProperties().getProperty("pig.usenewlogicalplan", 
"true").equals("true") ) {
                 LogicalPlanMigrationVistor migrator = new 
LogicalPlanMigrationVistor(lp);
                 migrator.visit();
+                migrator.finish();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan 
= migrator.getNewLogicalPlan();
                 
                 HashSet<String> optimizerRules = null;
@@ -1235,7 +1243,7 @@ public class PigServer {
                 }
                 else {
                     // add new store
-                    FuncSpec funcSpec = new 
FuncSpec(PigStorage.class.getName() + "()");
+                    FuncSpec funcSpec = new 
FuncSpec(InterStorage.class.getName());
                     fileSpec = new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), funcSpec);
                     store = new LOStore(referredPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)),
                             fileSpec, alias);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Thu Sep  2 00:01:56 2010
@@ -467,4 +467,8 @@ public class POUserFunc extends Expressi
             ((Accumulator)func).cleanup();
         }        
     }
+    
+    public void setResultType(byte resultType) {
+        this.resultType = resultType;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Thu Sep  
2 00:01:56 2010
@@ -17,17 +17,16 @@
  */
 package org.apache.pig.impl.builtin;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * ReadScalars reads a line from a file and returns it as its value. The
@@ -35,10 +34,10 @@ import org.apache.pig.impl.logicalLayer.
  * This is useful for incorporating a result from an agregation into another
  * evaluation.
  */
-public class ReadScalars extends EvalFunc<String> {
+public class ReadScalars extends EvalFunc<Object> {
     private String scalarfilename = null;
-    private String charset = "UTF-8";
-    private String value = null;
+  //  private String charset = "UTF-8";
+    private Object value = null;
 
     /**
      * Java level API
@@ -48,52 +47,44 @@ public class ReadScalars extends EvalFun
      *            read
      */
     @Override
-    public String exec(Tuple input) throws IOException {
+    public Object exec(Tuple input) throws IOException {
         if (value == null) {
             if (input == null || input.size() == 0)
                 return null;
 
-            InputStream is;
-            BufferedReader reader;
             int pos;
+            ReadToEndLoader loader;
             try {
                 pos = DataType.toInteger(input.get(0));
                 scalarfilename = DataType.toString(input.get(1));
-
-                is = FileLocalizer.openDFSFile(scalarfilename);
-                reader = new BufferedReader(new InputStreamReader(is, 
charset));
+                loader = new ReadToEndLoader(
+                        new InterStorage(), 
+                        UDFContext.getUDFContext().getJobConf(),
+                        scalarfilename, 0
+                );
             } catch (Exception e) {
                 throw new ExecException("Failed to open file '" + 
scalarfilename
                         + "'; error = " + e.getMessage());
             }
             try {
-                String line = reader.readLine();
-                if(line == null) {
-                    log.warn("No scalar field to read, returning null");
-                    return null;
-                }
-                String[] lineTok = line.split("\t");
-                if(pos > lineTok.length) {
+                Tuple t1 = loader.getNext();
+                if(t1 == null){
                     log.warn("No scalar field to read, returning null");
                     return null;
                 }
-                value = lineTok[pos];
-                if(reader.readLine() != null) {
-                    throw new ExecException("Scalar has more than one row in 
the output");
+                value = t1.get(pos);
+                Tuple t2 = loader.getNext();
+                if(t2 != null){
+                    String msg = "Scalar has more than one row in the output. 
" 
+                        + "1st : " + t1 + ", 2nd :" + t2;
+                    throw new ExecException(msg);   
                 }
+                
             } catch (Exception e) {
                 throw new ExecException(e.getMessage());
-            } finally {
-                reader.close();
-                is.close();
-            }
+            } 
         }
         return value;
     }
 
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(getSchemaName("ReadScalars", 
input),
-                DataType.CHARARRAY));
-    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Thu 
Sep  2 00:01:56 2010
@@ -26,6 +26,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 
@@ -56,10 +57,20 @@ public class LOUserFunc extends Expressi
         return mFuncSpec;
     }
     
+    /**
+     * Used for scalar alias. return the source logical operator
+     * this ReadScalars udf is associated with 
+     * @return input logical operator
+     */
     public LogicalOperator getImplicitReferencedOperator() {
         return implicitReferencedOperator;
     }
 
+    /**
+     * Used for scalar alias. set the source logical operator
+     * this ReadScalars udf is associated with 
+     * @param implicitReferencedOperator input logical operator
+     */
     public void setImplicitReferencedOperator(
             LogicalOperator implicitReferencedOperator) {
         this.implicitReferencedOperator = implicitReferencedOperator;
@@ -95,6 +106,33 @@ public class LOUserFunc extends Expressi
     @Override
     public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
+            
+            if(implicitReferencedOperator != null ){
+                // if this is a ReadScalars udf for scalar operation, use the 
+                // FieldSchema corresponding to this position in input 
+                List<ExpressionOperator> args = getArguments();
+                if(args != null && args.size() > 0 ){
+                    int pos = 
(Integer)((LOConst)getArguments().get(0)).getValue();
+                    LogicalOperator inp = implicitReferencedOperator;
+                    if(inp.getSchema() != null){
+                        // input logical operator has schema, copy and link
+                        //to corresponding FieldSchema
+                        FieldSchema inpFs = inp.getSchema().getField(pos);
+                        mFieldSchema = Schema.FieldSchema.copyAndLink(inpFs, 
inp);
+                    }else{
+                        // no schema for input logicaloperator, use bytearray
+                        // and set it as parent
+                        mFieldSchema = new FieldSchema(null, 
DataType.BYTEARRAY);
+                        mFieldSchema.setParent(null, inp);
+                    }
+                    mIsFieldSchemaComputed = true;
+                    return mFieldSchema;
+                }else{
+                    //predecessors haven't been setup, return null
+                    return null;
+                }
+            }
+            
             Schema inputSchema = new Schema();
             List<ExpressionOperator> args = getArguments();
             for(ExpressionOperator op: args) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Thu Sep  2 00:01:56 2010
@@ -770,6 +770,27 @@ public class QueryParser {
     static String constructFileNameSignature(String fileName, FuncSpec 
funcSpec) {
         return fileName+"_"+funcSpec.toString();
     }
+    
+    ExpressionOperator attachColPosToReadScalar(LogicalPlan lp, 
+            ExpressionOperator expr, int colNum, Schema over) 
+    throws PlanException, FrontendException{
+    
+        scalarFound = false;
+        // We also need to attach LOConst to the userfunc 
+        // so that it can read that projection number in ReadScalars UDF
+        LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), 
colNum);
+        rconst.setType(DataType.INTEGER);
+        lp.add(rconst);
+        lp.connect(rconst, expr);
+
+        if(over != null && over.getField(colNum).type != DataType.BYTEARRAY) {
+            LOCast loCast = new LOCast(lp, new OperatorKey(scope, 
getNextId()), over.getField(colNum).type);
+            lp.add(loCast);
+            lp.connect(expr, loCast);
+            expr = loCast;
+        } 
+        return expr;
+    }    
 }
 
 
@@ -4268,51 +4289,29 @@ ExpressionOperator DollarVar(Schema over
             }
         }
         // Scalar Projections
-               if(bracketed && eOp instanceof LOUserFunc && 
((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
-                       try {
-                               scalarFound = false;
-                               // Projections decides type of scalar, we need 
to add a Cast operator to track that
-                               LOCast loCast = null;
-                               if(over != null) {
-                                       if(over.getField(colNum).type != 
DataType.BYTEARRAY) {
-                                               loCast = new LOCast(lp, new 
OperatorKey(scope, getNextId()), over.getField(colNum).type);
-                                       } 
-                               } 
-                               if(loCast == null){
-                                       // Default type is chararray not 
bytearray for ReadScalar, as it reads string from the file
-                                       loCast = new LOCast(lp, new 
OperatorKey(scope, getNextId()), DataType.CHARARRAY);
-                               }
-                               lp.add(loCast);
-                               lp.connect(eOp, loCast);
-                               
-                               // We also need to attach LOConst to the 
userfunc 
-                               // so that it can read that projection number 
in ReadScalars UDF
-                               LOConst rconst = new LOConst(lp, new 
OperatorKey(scope, getNextId()), colNum);
-                rconst.setType(DataType.INTEGER);
-                lp.add(rconst);
-                lp.connect(rconst, eOp);
-                
-                return loCast;
-                       } catch(Exception e) {
-                               throw new ParseException("Invalid field in 
scalar" + e);
-                       }
-               }
-               ExpressionOperator project = new LOProject(lp, new 
OperatorKey(scope, getNextId()), eOp, undollar(t1.image));
-               try {
-                       log.debug("eOp: " + eOp.getClass().getName() + " " + 
eOp);
-                       lp.add(project);
-                       log.debug("DollarVar: Added operator " + 
project.getClass().getName() + " " + project + " to logical plan " + lp);
+        if(bracketed && eOp instanceof LOUserFunc && 
((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+            try {
+                return attachColPosToReadScalar(lp, (ExpressionOperator)eOp, 
colNum, over);
+            } catch(Exception e) {
+                throw new ParseException("Invalid field in scalar" + e);
+            }
+        }
+        ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, 
getNextId()), eOp, undollar(t1.image));
+        try {
+            log.debug("eOp: " + eOp.getClass().getName() + " " + eOp);
+            lp.add(project);
+            log.debug("DollarVar: Added operator " + 
project.getClass().getName() + " " + project + " to logical plan " + lp);
             if((eOp instanceof ExpressionOperator) && (bracketed)) {
-                           lp.add(eOp);
-                           lp.connect(eOp, project);
+                lp.add(eOp);
+                lp.connect(eOp, project);
             }
-               } catch (Exception planException) {
-                       ParseException pe = new 
ParseException(planException.getMessage());
-                       pe.initCause(planException); 
-                       throw pe;
-               }
-               log.trace("Exiting DollarVar");
-               return project;
+        } catch (Exception planException) {
+            ParseException pe = new ParseException(planException.getMessage());
+            pe.initCause(planException); 
+            throw pe;
+        }
+        log.trace("Exiting DollarVar");
+        return project;
        }
 }
 
@@ -4415,56 +4414,37 @@ ExpressionOperator AliasFieldOrSpec(Sche
             } catch (FrontendException fee) {
                ParseException pe = new ParseException(fee.getMessage());
                pe.initCause(fee);
-                throw pe;
+               throw pe;
             }
-                       log.debug("Position of " + t1.image + " = " + i);
-                       if(null != over) {
-                               log.debug("Printing out the aliases in the 
schema");
-                               over.printAliases();
-                       }
-                       // Scalar Projections
-                       if(bracketed && eOp instanceof LOUserFunc && 
((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
-                               try {
-                                       scalarFound = false;
-                                       // Projections decides type of scalar, 
we need to add a Cast operator to track that
-                                       LOCast loCast;
-                                       if(over.getField(i).type != 
DataType.BYTEARRAY) {
-                                               loCast = new LOCast(lp, new 
OperatorKey(scope, getNextId()), over.getField(i).type);
-                                       } else {
-                                               // Default type is chararray 
not bytearray for ReadScalar, as it reads string from the file
-                                               loCast = new LOCast(lp, new 
OperatorKey(scope, getNextId()), DataType.CHARARRAY);
-                                       }
-                                       lp.add(loCast);
-                                       lp.connect(eOp, loCast);
-                                       
-                                       // We also need to attach LOConst to 
the userfunc 
-                                       // so that it can read that projection 
number in ReadScalars UDF
-                                       LOConst rconst = new LOConst(lp, new 
OperatorKey(scope, getNextId()), i);
-                       rconst.setType(DataType.INTEGER);
-                       lp.add(rconst);
-                       lp.connect(rconst, eOp);
-                       
-                       return loCast;
-                               } catch(Exception e) {
-                                       throw new ParseException("Invalid field 
in scalar" + e);
-                               }
-                       }
-                       item = new LOProject(lp, new OperatorKey(scope, 
getNextId()), eOp, i);
-                       item.setAlias(t1.image);
-                       try {
-                               lp.add(item);
-                               log.debug("AliasFieldOrSpec: Added operator " + 
item.getClass().getName() + " " + item + " to logical plan " + lp);
+            log.debug("Position of " + t1.image + " = " + i);
+            if(null != over) {
+                log.debug("Printing out the aliases in the schema");
+                over.printAliases();
+            }
+            // Scalar Projections
+            if(bracketed && eOp instanceof LOUserFunc && 
((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+                try {
+                    return attachColPosToReadScalar(lp, 
(ExpressionOperator)eOp, i, over);
+                } catch(Exception e) {
+                    throw new ParseException("Invalid field in scalar" + e);
+                }
+            }
+            item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, 
i);
+            item.setAlias(t1.image);
+            try {
+                lp.add(item);
+                log.debug("AliasFieldOrSpec: Added operator " + 
item.getClass().getName() + " " + item + " to logical plan " + lp);
                 if((eOp instanceof ExpressionOperator) && (bracketed)) {
-                                   lp.add(eOp);
-                                   lp.connect(eOp, item);
+                    lp.add(eOp);
+                    lp.connect(eOp, item);
                 }
-                       } catch (Exception planException) {
-                               ParseException parseException = new 
ParseException(planException.getMessage());
-                               parseException.initCause(planException);
-                               throw parseException;
-                       }
+            } catch (Exception planException) {
+                ParseException parseException = new 
ParseException(planException.getMessage());
+                parseException.initCause(planException);
+                throw parseException;
+            }
                }
-               
+
                log.trace("Exiting AliasFieldOrSpec");
                return item;
        }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 Thu Sep  2 00:01:56 2010
@@ -1774,7 +1774,15 @@ public class TypeCheckingVisitor extends
         // set here. This is a special case where output type is not
         // automatically determined.
         
-        if(inputType == DataType.BYTEARRAY) {
+        if(inputType == DataType.BYTEARRAY || 
+                (       // a hack . need to add a caster for LOUserFunc if its 
for
+                        // scalar alias, as the dependency on predecessor LO 
is not
+                        // managed correctly, and might result in result type 
getting
+                        // set as bytearray later on
+                        cast.getExpression() instanceof LOUserFunc && 
+                        
((LOUserFunc)cast.getExpression()).getImplicitReferencedOperator() != null
+                )
+        ) {
             try {
                Map<String, LogicalOperator> canonicalMap = 
cast.getFieldSchema().getCanonicalMap();
                for( Map.Entry<String, LogicalOperator> entry : 
canonicalMap.entrySet() ) {
@@ -3068,9 +3076,12 @@ public class TypeCheckingVisitor extends
         MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, 
FuncSpec>();
         if(op instanceof ExpressionOperator) {
             if(op instanceof LOUserFunc) {
-                return null;
-            }
-            
+                if(((LOUserFunc)op).getImplicitReferencedOperator() == null){
+                    // in case of scalar alias user function, proceed and go 
to the parent
+                    // in case of other user functions, stop here and return 
null
+                    return null;
+                }
+            }            
             Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
             if( parentCanonicalName != null ) {
                fs = fs.findFieldSchema( parentCanonicalName );

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 Thu Sep  2 00:01:56 2010
@@ -24,13 +24,19 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import 
org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 
 public class UserFuncExpression extends LogicalExpression {
 
@@ -94,6 +100,30 @@ public class UserFuncExpression extends 
     public LogicalSchema.LogicalFieldSchema getFieldSchema() throws 
FrontendException {
         if (fieldSchema!=null)
             return fieldSchema;
+        
+        if(implicitReferencedOperator != null &&
+                
mFuncSpec.getClassName().equals("org.apache.pig.impl.builtin.ReadScalars")){
+            // if this is a ReadScalars udf for scalar operation, use the 
+            // FieldSchema corresponding to this position in input 
+            List<Operator> args = plan.getSuccessors(this);
+            if(args != null && args.size() > 0 ){
+                int pos = 
(Integer)((ConstantExpression)args.get(0)).getValue();
+                LogicalRelationalOperator inp = 
(LogicalRelationalOperator)implicitReferencedOperator;
+
+                if( inp.getSchema() != null){
+                    LogicalFieldSchema inpFs = inp.getSchema().getField(pos);
+                    fieldSchema = new LogicalFieldSchema(inpFs);
+                    //  fieldSchema.alias = "ReadScalars_" + fieldSchema.alias;
+                }else{
+                    fieldSchema = new LogicalFieldSchema(null, null, 
DataType.BYTEARRAY);
+                }
+                return fieldSchema;
+            }else{
+                //predecessors haven't been setup, return null
+                return null;
+            }
+        }
+
         LogicalSchema inputSchema = new LogicalSchema();
         List<Operator> succs = plan.getSuccessors(this);
 
@@ -137,11 +167,12 @@ public class UserFuncExpression extends 
 
     @Override
     public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws 
FrontendException {
-        LogicalExpression copy =  null; 
+        UserFuncExpression copy =  null; 
         try {
-        copy = new UserFuncExpression(
-                lgExpPlan,
-                this.getFuncSpec().clone() );
+            copy = new UserFuncExpression(
+                    lgExpPlan,
+                    this.getFuncSpec().clone() );
+            
copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator());
         } catch(CloneNotSupportedException e) {
              e.printStackTrace();
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=991772&r1=991771&r2=991772&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Thu Sep  2 
00:01:56 2010
@@ -17,25 +17,26 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestScalarAliases extends TestCase {
+public class TestScalarAliases  {
     static MiniCluster cluster = MiniCluster.buildCluster();
     private PigServer pigServer;
 
@@ -43,10 +44,9 @@ public class TestScalarAliases extends T
     BagFactory mBf = BagFactory.getInstance();
 
     @Before
-    @Override
     public void setUp() throws Exception{
-        FileLocalizer.setR(new Random());
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        FileLocalizer.setInitialized(false);
+        pigServer = new PigServer(ExecType.LOCAL);
     }
 
     @AfterClass
@@ -64,7 +64,7 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "table_testScalarAliasesBatch", input);
+        Util.createLocalInputFile( "table_testScalarAliasesBatch", input);
         // Test in script mode
         pigServer.setBatchOn();
         pigServer.registerQuery("A = LOAD 'table_testScalarAliasesBatch' as 
(a0: long, a1: double);");
@@ -103,6 +103,9 @@ public class TestScalarAliases extends T
         assertTrue(t.toString().equals("(9,1.0)"));
 
         assertFalse(iter.hasNext());
+        
+        Util.deleteDirectory(new File("table_testScalarAliasesDir"));
+
     }
 
     // See PIG-1434
@@ -115,7 +118,7 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "table_testUseScalarMultipleTimes", 
input);
+        Util.createLocalInputFile( "table_testUseScalarMultipleTimes", input);
         pigServer.setBatchOn();
         pigServer.registerQuery("A = LOAD 'table_testUseScalarMultipleTimes' 
as (a0: long, a1: double);");
         pigServer.registerQuery("B = group A all;");
@@ -188,6 +191,10 @@ public class TestScalarAliases extends T
         assertTrue(t.toString().equals("(23.0,60.0)"));
 
         assertFalse(iter.hasNext());
+        
+        Util.deleteDirectory(new File("table_testUseScalarMultipleTimesOutY"));
+        Util.deleteDirectory(new File("table_testUseScalarMultipleTimesOutZ"));
+        
     }
 
     // See PIG-1434
@@ -201,24 +208,14 @@ public class TestScalarAliases extends T
                 "2\t10",
                 "3\t20"
         };
-        Util.createInputFile(cluster, "table_testScalarWithNoSchema", input);
-        Util.createInputFile(cluster, "table_testScalarWithNoSchemaScalar", 
scalarInput);
+        Util.createLocalInputFile( "table_testScalarWithNoSchema", input);
+        Util.createLocalInputFile( "table_testScalarWithNoSchemaScalar", 
scalarInput);
         // Load A as a scalar
         pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchema';");
         pigServer.registerQuery("scalar = LOAD 
'table_testScalarWithNoSchemaScalar' as (count, total);");
         pigServer.registerQuery("B = foreach A generate 5 / scalar.total;");
 
-        try {
-            pigServer.openIterator("B");
-            fail("We do not support no schema scalar without a cast");
-        } catch (FrontendException te) {
-            // In alias B, incompatible types in Division Operator left hand 
side:int right hand side:chararray
-            
assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode()
 == 1039);
-        }
-
-        pigServer.registerQuery("C = foreach A generate 5 / 
(int)scalar.total;");
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
 
         Tuple t = iter.next();
         assertTrue(t.get(0).toString().equals("1"));
@@ -249,8 +246,8 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "testScalarWithTwoBranchesA", inputA);
-        Util.createInputFile(cluster, "testScalarWithTwoBranchesX", inputX);
+        Util.createLocalInputFile( "testScalarWithTwoBranchesA", inputA);
+        Util.createLocalInputFile( "testScalarWithTwoBranchesX", inputX);
         // Test in script mode
         pigServer.setBatchOn();
         pigServer.registerQuery("A = LOAD 'testScalarWithTwoBranchesA' as (a0: 
long, a1: double);");
@@ -289,6 +286,9 @@ public class TestScalarAliases extends T
         assertTrue(t.toString().equals("(rocks,20.0)"));
 
         assertFalse(iter.hasNext());
+        
+        Util.deleteDirectory(new File("testScalarWithTwoBranchesDir"));
+
     }
 
     // See PIG-1434
@@ -301,16 +301,18 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "table_testFilteredScalarDollarProj", 
input);
+        Util.createLocalInputFile( "table_testFilteredScalarDollarProj", 
input);
         // Test in script mode
         pigServer.setBatchOn();
         pigServer.registerQuery("A = LOAD 'table_testFilteredScalarDollarProj' 
as (a0: long, a1: double);");
         pigServer.registerQuery("B = filter A by $1 < 8;");
         pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / 
B.$1);");
         pigServer.registerQuery("Store Y into 
'table_testFilteredScalarDollarProjDir';");
+        pigServer.explain("Y", System.err);
         pigServer.executeBatch();
         // Check output
         pigServer.registerQuery("Z = LOAD 
'table_testFilteredScalarDollarProjDir' as (a0: int, a1: double);");
+        pigServer.explain("Z", System.err);
 
         Iterator<Tuple> iter = pigServer.openIterator("Z");
 
@@ -339,6 +341,8 @@ public class TestScalarAliases extends T
 
         assertFalse(iter.hasNext());
 
+        Util.deleteDirectory(new 
File("table_testFilteredScalarDollarProjDir"));
+
     }
 
     // See PIG-1434
@@ -352,24 +356,14 @@ public class TestScalarAliases extends T
                 "2\t10",
                 "3\t20"
         };
-        Util.createInputFile(cluster, 
"table_testScalarWithNoSchemaDollarProj", input);
-        Util.createInputFile(cluster, 
"table_testScalarWithNoSchemaDollarProjScalar", scalarInput);
+        Util.createLocalInputFile( "table_testScalarWithNoSchemaDollarProj", 
input);
+        Util.createLocalInputFile( 
"table_testScalarWithNoSchemaDollarProjScalar", scalarInput);
         // Load A as a scalar
         pigServer.registerQuery("A = LOAD 
'table_testScalarWithNoSchemaDollarProj';");
         pigServer.registerQuery("scalar = LOAD 
'table_testScalarWithNoSchemaDollarProjScalar';");
         pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;");
 
-        try {
-            pigServer.openIterator("B");
-            fail("We do not support no schema scalar without a cast");
-        } catch (FrontendException te) {
-            // In alias B, incompatible types in Division Operator left hand 
side:int right hand side:chararray
-            
assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode()
 == 1039);
-        }
-
-        pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.$1;");
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
 
         Tuple t = iter.next();
         assertTrue(t.get(0).toString().equals("1"));
@@ -399,8 +393,8 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseA", 
inputA);
-        Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseB", 
inputB);
+        Util.createLocalInputFile( "table_testScalarAliasesJoinClauseA", 
inputA);
+        Util.createLocalInputFile( "table_testScalarAliasesJoinClauseB", 
inputB);
         // Test in script mode
         pigServer.registerQuery("A = LOAD 'table_testScalarAliasesJoinClauseA' 
as (a0, a1);");
         pigServer.registerQuery("G = group A all;");
@@ -435,7 +429,7 @@ public class TestScalarAliases extends T
         };
 
         // Test the use of scalars in expressions
-        Util.createInputFile(cluster, "table_testScalarAliasesFilterClause", 
input);
+        Util.createLocalInputFile( "table_testScalarAliasesFilterClause", 
input);
         // Test in script mode
         pigServer.registerQuery("A = LOAD 
'table_testScalarAliasesFilterClause' as (a0, a1);");
         pigServer.registerQuery("G = group A all;");
@@ -458,6 +452,7 @@ public class TestScalarAliases extends T
     // See PIG-1434
     @Test
     public void testScalarAliasesSplitClause() throws Exception{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         String[] input = {
                 "1\t5",
                 "2\t10",
@@ -487,7 +482,10 @@ public class TestScalarAliases extends T
         assertTrue(t.toString().equals("(3,20.0)"));
 
         assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, "table_testScalarAliasesSplitClauseDir");
+
     }
+    
 
     // See PIG-1434
     @Test
@@ -498,7 +496,7 @@ public class TestScalarAliases extends T
                 "3\t20"
         };
 
-        Util.createInputFile(cluster, "table_testScalarAliasesGrammar", input);
+        Util.createLocalInputFile( "table_testScalarAliasesGrammar", input);
         pigServer.registerQuery("A = LOAD 'table_testScalarAliasesGrammar' as 
(a0: long, a1: double);");
         pigServer.registerQuery("B = group A all;");
         pigServer.registerQuery("C = foreach B generate COUNT(A);");


Reply via email to