Author: thejas Date: Thu Sep 2 00:01:56 2010 New Revision: 991772 URL: http://svn.apache.org/viewvc?rev=991772&view=rev Log: PIG-1572: change default datatype when relations are used as scalar to bytearray
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Sep 2 00:01:56 2010 @@ -193,6 +193,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1572: change default datatype when relations are used as scalar to bytearray (thejas) + PIG-1583: piggybank unit test TestLookupInFiles is broken (daijy) PIG-1563: some of string functions don't work on bytearrays (olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Sep 2 00:01:56 2010 @@ -63,6 +63,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.InterStorage; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.LOConst; import org.apache.pig.impl.logicalLayer.LODefine; @@ -634,7 +635,13 @@ public class PigServer { try { LogicalPlan lp = getPlanFromAlias(alias, "describe"); lp = compileLp(alias, false); - Schema schema = lp.getLeaves().get(0).getSchema(); + Schema schema = null; + for(LogicalOperator lo : lp.getLeaves()){ + if(lo.getAlias().equals(alias)){ + schema = lo.getSchema(); + break; + } + } if (schema != null) System.out.println(alias + ": " + schema.toString()); else System.out.println("Schema for " + alias + " unknown."); return schema; @@ -896,6 +903,7 @@ public class PigServer { if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("true") ) { LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp); migrator.visit(); + migrator.finish(); org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan(); HashSet<String> optimizerRules = null; @@ -1235,7 +1243,7 @@ public class PigServer { } else { // add new store - FuncSpec funcSpec = new FuncSpec(PigStorage.class.getName() + "()"); + FuncSpec funcSpec = new FuncSpec(InterStorage.class.getName()); fileSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), funcSpec); store = new LOStore(referredPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), fileSpec, alias); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Sep 2 00:01:56 2010 @@ -467,4 +467,8 @@ public class POUserFunc extends Expressi ((Accumulator)func).cleanup(); } } + + public void setResultType(byte resultType) { + this.resultType = resultType; + } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Thu Sep 2 00:01:56 2010 @@ -17,17 +17,16 @@ */ package org.apache.pig.impl.builtin; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.io.InterStorage; +import org.apache.pig.impl.io.ReadToEndLoader; +import org.apache.pig.impl.util.UDFContext; /** * ReadScalars reads a line from a file and returns it as its value. The @@ -35,10 +34,10 @@ import org.apache.pig.impl.logicalLayer. * This is useful for incorporating a result from an agregation into another * evaluation. */ -public class ReadScalars extends EvalFunc<String> { +public class ReadScalars extends EvalFunc<Object> { private String scalarfilename = null; - private String charset = "UTF-8"; - private String value = null; + // private String charset = "UTF-8"; + private Object value = null; /** * Java level API @@ -48,52 +47,44 @@ public class ReadScalars extends EvalFun * read */ @Override - public String exec(Tuple input) throws IOException { + public Object exec(Tuple input) throws IOException { if (value == null) { if (input == null || input.size() == 0) return null; - InputStream is; - BufferedReader reader; int pos; + ReadToEndLoader loader; try { pos = DataType.toInteger(input.get(0)); scalarfilename = DataType.toString(input.get(1)); - - is = FileLocalizer.openDFSFile(scalarfilename); - reader = new BufferedReader(new InputStreamReader(is, charset)); + loader = new ReadToEndLoader( + new InterStorage(), + UDFContext.getUDFContext().getJobConf(), + scalarfilename, 0 + ); } catch (Exception e) { throw new ExecException("Failed to open file '" + scalarfilename + "'; error = " + e.getMessage()); } try { - String line = reader.readLine(); - if(line == null) { - log.warn("No scalar field to read, returning null"); - return null; - } - String[] lineTok = line.split("\t"); - if(pos > lineTok.length) { + Tuple t1 = loader.getNext(); + if(t1 == null){ log.warn("No scalar field to read, returning null"); return null; } - value = lineTok[pos]; - if(reader.readLine() != null) { - throw new ExecException("Scalar has more than one row in the output"); + value = t1.get(pos); + Tuple t2 = loader.getNext(); + if(t2 != null){ + String msg = "Scalar has more than one row in the output. " + + "1st : " + t1 + ", 2nd :" + t2; + throw new ExecException(msg); } + } catch (Exception e) { throw new ExecException(e.getMessage()); - } finally { - reader.close(); - is.close(); - } + } } return value; } - @Override - public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(getSchemaName("ReadScalars", input), - DataType.CHARARRAY)); - } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Thu Sep 2 00:01:56 2010 @@ -26,6 +26,7 @@ import org.apache.pig.PigException; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.data.DataType; import org.apache.pig.impl.plan.OperatorKey; @@ -56,10 +57,20 @@ public class LOUserFunc extends Expressi return mFuncSpec; } + /** + * Used for scalar alias. return the source logical operator + * this ReadScalars udf is associated with + * @return input logical operator + */ public LogicalOperator getImplicitReferencedOperator() { return implicitReferencedOperator; } + /** + * Used for scalar alias. set the source logical operator + * this ReadScalars udf is associated with + * @param implicitReferencedOperator input logical operator + */ public void setImplicitReferencedOperator( LogicalOperator implicitReferencedOperator) { this.implicitReferencedOperator = implicitReferencedOperator; @@ -95,6 +106,33 @@ public class LOUserFunc extends Expressi @Override public Schema.FieldSchema getFieldSchema() throws FrontendException { if(!mIsFieldSchemaComputed) { + + if(implicitReferencedOperator != null ){ + // if this is a ReadScalars udf for scalar operation, use the + // FieldSchema corresponding to this position in input + List<ExpressionOperator> args = getArguments(); + if(args != null && args.size() > 0 ){ + int pos = (Integer)((LOConst)getArguments().get(0)).getValue(); + LogicalOperator inp = implicitReferencedOperator; + if(inp.getSchema() != null){ + // input logical operator has schema, copy and link + //to corresponding FieldSchema + FieldSchema inpFs = inp.getSchema().getField(pos); + mFieldSchema = Schema.FieldSchema.copyAndLink(inpFs, inp); + }else{ + // no schema for input logicaloperator, use bytearray + // and set it as parent + mFieldSchema = new FieldSchema(null, DataType.BYTEARRAY); + mFieldSchema.setParent(null, inp); + } + mIsFieldSchemaComputed = true; + return mFieldSchema; + }else{ + //predecessors haven't been setup, return null + return null; + } + } + Schema inputSchema = new Schema(); List<ExpressionOperator> args = getArguments(); for(ExpressionOperator op: args) { Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Sep 2 00:01:56 2010 @@ -770,6 +770,27 @@ public class QueryParser { static String constructFileNameSignature(String fileName, FuncSpec funcSpec) { return fileName+"_"+funcSpec.toString(); } + + ExpressionOperator attachColPosToReadScalar(LogicalPlan lp, + ExpressionOperator expr, int colNum, Schema over) + throws PlanException, FrontendException{ + + scalarFound = false; + // We also need to attach LOConst to the userfunc + // so that it can read that projection number in ReadScalars UDF + LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum); + rconst.setType(DataType.INTEGER); + lp.add(rconst); + lp.connect(rconst, expr); + + if(over != null && over.getField(colNum).type != DataType.BYTEARRAY) { + LOCast loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type); + lp.add(loCast); + lp.connect(expr, loCast); + expr = loCast; + } + return expr; + } } @@ -4268,51 +4289,29 @@ ExpressionOperator DollarVar(Schema over } } // Scalar Projections - if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) { - try { - scalarFound = false; - // Projections decides type of scalar, we need to add a Cast operator to track that - LOCast loCast = null; - if(over != null) { - if(over.getField(colNum).type != DataType.BYTEARRAY) { - loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type); - } - } - if(loCast == null){ - // Default type is chararray not bytearray for ReadScalar, as it reads string from the file - loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY); - } - lp.add(loCast); - lp.connect(eOp, loCast); - - // We also need to attach LOConst to the userfunc - // so that it can read that projection number in ReadScalars UDF - LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum); - rconst.setType(DataType.INTEGER); - lp.add(rconst); - lp.connect(rconst, eOp); - - return loCast; - } catch(Exception e) { - throw new ParseException("Invalid field in scalar" + e); - } - } - ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image)); - try { - log.debug("eOp: " + eOp.getClass().getName() + " " + eOp); - lp.add(project); - log.debug("DollarVar: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp); + if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) { + try { + return attachColPosToReadScalar(lp, (ExpressionOperator)eOp, colNum, over); + } catch(Exception e) { + throw new ParseException("Invalid field in scalar" + e); + } + } + ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image)); + try { + log.debug("eOp: " + eOp.getClass().getName() + " " + eOp); + lp.add(project); + log.debug("DollarVar: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp); if((eOp instanceof ExpressionOperator) && (bracketed)) { - lp.add(eOp); - lp.connect(eOp, project); + lp.add(eOp); + lp.connect(eOp, project); } - } catch (Exception planException) { - ParseException pe = new ParseException(planException.getMessage()); - pe.initCause(planException); - throw pe; - } - log.trace("Exiting DollarVar"); - return project; + } catch (Exception planException) { + ParseException pe = new ParseException(planException.getMessage()); + pe.initCause(planException); + throw pe; + } + log.trace("Exiting DollarVar"); + return project; } } @@ -4415,56 +4414,37 @@ ExpressionOperator AliasFieldOrSpec(Sche } catch (FrontendException fee) { ParseException pe = new ParseException(fee.getMessage()); pe.initCause(fee); - throw pe; + throw pe; } - log.debug("Position of " + t1.image + " = " + i); - if(null != over) { - log.debug("Printing out the aliases in the schema"); - over.printAliases(); - } - // Scalar Projections - if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) { - try { - scalarFound = false; - // Projections decides type of scalar, we need to add a Cast operator to track that - LOCast loCast; - if(over.getField(i).type != DataType.BYTEARRAY) { - loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(i).type); - } else { - // Default type is chararray not bytearray for ReadScalar, as it reads string from the file - loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY); - } - lp.add(loCast); - lp.connect(eOp, loCast); - - // We also need to attach LOConst to the userfunc - // so that it can read that projection number in ReadScalars UDF - LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), i); - rconst.setType(DataType.INTEGER); - lp.add(rconst); - lp.connect(rconst, eOp); - - return loCast; - } catch(Exception e) { - throw new ParseException("Invalid field in scalar" + e); - } - } - item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i); - item.setAlias(t1.image); - try { - lp.add(item); - log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp); + log.debug("Position of " + t1.image + " = " + i); + if(null != over) { + log.debug("Printing out the aliases in the schema"); + over.printAliases(); + } + // Scalar Projections + if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) { + try { + return attachColPosToReadScalar(lp, (ExpressionOperator)eOp, i, over); + } catch(Exception e) { + throw new ParseException("Invalid field in scalar" + e); + } + } + item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i); + item.setAlias(t1.image); + try { + lp.add(item); + log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp); if((eOp instanceof ExpressionOperator) && (bracketed)) { - lp.add(eOp); - lp.connect(eOp, item); + lp.add(eOp); + lp.connect(eOp, item); } - } catch (Exception planException) { - ParseException parseException = new ParseException(planException.getMessage()); - parseException.initCause(planException); - throw parseException; - } + } catch (Exception planException) { + ParseException parseException = new ParseException(planException.getMessage()); + parseException.initCause(planException); + throw parseException; + } } - + log.trace("Exiting AliasFieldOrSpec"); return item; } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Sep 2 00:01:56 2010 @@ -1774,7 +1774,15 @@ public class TypeCheckingVisitor extends // set here. This is a special case where output type is not // automatically determined. - if(inputType == DataType.BYTEARRAY) { + if(inputType == DataType.BYTEARRAY || + ( // a hack . need to add a caster for LOUserFunc if its for + // scalar alias, as the dependency on predecessor LO is not + // managed correctly, and might result in result type getting + // set as bytearray later on + cast.getExpression() instanceof LOUserFunc && + ((LOUserFunc)cast.getExpression()).getImplicitReferencedOperator() != null + ) + ) { try { Map<String, LogicalOperator> canonicalMap = cast.getFieldSchema().getCanonicalMap(); for( Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) { @@ -3068,9 +3076,12 @@ public class TypeCheckingVisitor extends MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>(); if(op instanceof ExpressionOperator) { if(op instanceof LOUserFunc) { - return null; - } - + if(((LOUserFunc)op).getImplicitReferencedOperator() == null){ + // in case of scalar alias user function, proceed and go to the parent + // in case of other user functions, stop here and return null + return null; + } + } Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema(); if( parentCanonicalName != null ) { fs = fs.findFieldSchema( parentCanonicalName ); Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Thu Sep 2 00:01:56 2010 @@ -24,13 +24,19 @@ import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.ExpressionOperator; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LOConst; +import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; import org.apache.pig.newplan.PlanVisitor; import org.apache.pig.newplan.logical.Util; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; public class UserFuncExpression extends LogicalExpression { @@ -94,6 +100,30 @@ public class UserFuncExpression extends public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException { if (fieldSchema!=null) return fieldSchema; + + if(implicitReferencedOperator != null && + mFuncSpec.getClassName().equals("org.apache.pig.impl.builtin.ReadScalars")){ + // if this is a ReadScalars udf for scalar operation, use the + // FieldSchema corresponding to this position in input + List<Operator> args = plan.getSuccessors(this); + if(args != null && args.size() > 0 ){ + int pos = (Integer)((ConstantExpression)args.get(0)).getValue(); + LogicalRelationalOperator inp = (LogicalRelationalOperator)implicitReferencedOperator; + + if( inp.getSchema() != null){ + LogicalFieldSchema inpFs = inp.getSchema().getField(pos); + fieldSchema = new LogicalFieldSchema(inpFs); + // fieldSchema.alias = "ReadScalars_" + fieldSchema.alias; + }else{ + fieldSchema = new LogicalFieldSchema(null, null, DataType.BYTEARRAY); + } + return fieldSchema; + }else{ + //predecessors haven't been setup, return null + return null; + } + } + LogicalSchema inputSchema = new LogicalSchema(); List<Operator> succs = plan.getSuccessors(this); @@ -137,11 +167,12 @@ public class UserFuncExpression extends @Override public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException { - LogicalExpression copy = null; + UserFuncExpression copy = null; try { - copy = new UserFuncExpression( - lgExpPlan, - this.getFuncSpec().clone() ); + copy = new UserFuncExpression( + lgExpPlan, + this.getFuncSpec().clone() ); + copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator()); } catch(CloneNotSupportedException e) { e.printStackTrace(); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=991772&r1=991771&r2=991772&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Thu Sep 2 00:01:56 2010 @@ -17,25 +17,26 @@ */ package org.apache.pig.test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.Random; -import junit.framework.TestCase; - import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -public class TestScalarAliases extends TestCase { +public class TestScalarAliases { static MiniCluster cluster = MiniCluster.buildCluster(); private PigServer pigServer; @@ -43,10 +44,9 @@ public class TestScalarAliases extends T BagFactory mBf = BagFactory.getInstance(); @Before - @Override public void setUp() throws Exception{ - FileLocalizer.setR(new Random()); - pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + FileLocalizer.setInitialized(false); + pigServer = new PigServer(ExecType.LOCAL); } @AfterClass @@ -64,7 +64,7 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "table_testScalarAliasesBatch", input); + Util.createLocalInputFile( "table_testScalarAliasesBatch", input); // Test in script mode pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'table_testScalarAliasesBatch' as (a0: long, a1: double);"); @@ -103,6 +103,9 @@ public class TestScalarAliases extends T assertTrue(t.toString().equals("(9,1.0)")); assertFalse(iter.hasNext()); + + Util.deleteDirectory(new File("table_testScalarAliasesDir")); + } // See PIG-1434 @@ -115,7 +118,7 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "table_testUseScalarMultipleTimes", input); + Util.createLocalInputFile( "table_testUseScalarMultipleTimes", input); pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'table_testUseScalarMultipleTimes' as (a0: long, a1: double);"); pigServer.registerQuery("B = group A all;"); @@ -188,6 +191,10 @@ public class TestScalarAliases extends T assertTrue(t.toString().equals("(23.0,60.0)")); assertFalse(iter.hasNext()); + + Util.deleteDirectory(new File("table_testUseScalarMultipleTimesOutY")); + Util.deleteDirectory(new File("table_testUseScalarMultipleTimesOutZ")); + } // See PIG-1434 @@ -201,24 +208,14 @@ public class TestScalarAliases extends T "2\t10", "3\t20" }; - Util.createInputFile(cluster, "table_testScalarWithNoSchema", input); - Util.createInputFile(cluster, "table_testScalarWithNoSchemaScalar", scalarInput); + Util.createLocalInputFile( "table_testScalarWithNoSchema", input); + Util.createLocalInputFile( "table_testScalarWithNoSchemaScalar", scalarInput); // Load A as a scalar pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchema';"); pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaScalar' as (count, total);"); pigServer.registerQuery("B = foreach A generate 5 / scalar.total;"); - try { - pigServer.openIterator("B"); - fail("We do not support no schema scalar without a cast"); - } catch (FrontendException te) { - // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray - assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039); - } - - pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.total;"); - - Iterator<Tuple> iter = pigServer.openIterator("C"); + Iterator<Tuple> iter = pigServer.openIterator("B"); Tuple t = iter.next(); assertTrue(t.get(0).toString().equals("1")); @@ -249,8 +246,8 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "testScalarWithTwoBranchesA", inputA); - Util.createInputFile(cluster, "testScalarWithTwoBranchesX", inputX); + Util.createLocalInputFile( "testScalarWithTwoBranchesA", inputA); + Util.createLocalInputFile( "testScalarWithTwoBranchesX", inputX); // Test in script mode pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'testScalarWithTwoBranchesA' as (a0: long, a1: double);"); @@ -289,6 +286,9 @@ public class TestScalarAliases extends T assertTrue(t.toString().equals("(rocks,20.0)")); assertFalse(iter.hasNext()); + + Util.deleteDirectory(new File("testScalarWithTwoBranchesDir")); + } // See PIG-1434 @@ -301,16 +301,18 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "table_testFilteredScalarDollarProj", input); + Util.createLocalInputFile( "table_testFilteredScalarDollarProj", input); // Test in script mode pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'table_testFilteredScalarDollarProj' as (a0: long, a1: double);"); pigServer.registerQuery("B = filter A by $1 < 8;"); pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1);"); pigServer.registerQuery("Store Y into 'table_testFilteredScalarDollarProjDir';"); + pigServer.explain("Y", System.err); pigServer.executeBatch(); // Check output pigServer.registerQuery("Z = LOAD 'table_testFilteredScalarDollarProjDir' as (a0: int, a1: double);"); + pigServer.explain("Z", System.err); Iterator<Tuple> iter = pigServer.openIterator("Z"); @@ -339,6 +341,8 @@ public class TestScalarAliases extends T assertFalse(iter.hasNext()); + Util.deleteDirectory(new File("table_testFilteredScalarDollarProjDir")); + } // See PIG-1434 @@ -352,24 +356,14 @@ public class TestScalarAliases extends T "2\t10", "3\t20" }; - Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProj", input); - Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProjScalar", scalarInput); + Util.createLocalInputFile( "table_testScalarWithNoSchemaDollarProj", input); + Util.createLocalInputFile( "table_testScalarWithNoSchemaDollarProjScalar", scalarInput); // Load A as a scalar pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchemaDollarProj';"); pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaDollarProjScalar';"); pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;"); - try { - pigServer.openIterator("B"); - fail("We do not support no schema scalar without a cast"); - } catch (FrontendException te) { - // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray - assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039); - } - - pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.$1;"); - - Iterator<Tuple> iter = pigServer.openIterator("C"); + Iterator<Tuple> iter = pigServer.openIterator("B"); Tuple t = iter.next(); assertTrue(t.get(0).toString().equals("1")); @@ -399,8 +393,8 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseA", inputA); - Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseB", inputB); + Util.createLocalInputFile( "table_testScalarAliasesJoinClauseA", inputA); + Util.createLocalInputFile( "table_testScalarAliasesJoinClauseB", inputB); // Test in script mode pigServer.registerQuery("A = LOAD 'table_testScalarAliasesJoinClauseA' as (a0, a1);"); pigServer.registerQuery("G = group A all;"); @@ -435,7 +429,7 @@ public class TestScalarAliases extends T }; // Test the use of scalars in expressions - Util.createInputFile(cluster, "table_testScalarAliasesFilterClause", input); + Util.createLocalInputFile( "table_testScalarAliasesFilterClause", input); // Test in script mode pigServer.registerQuery("A = LOAD 'table_testScalarAliasesFilterClause' as (a0, a1);"); pigServer.registerQuery("G = group A all;"); @@ -458,6 +452,7 @@ public class TestScalarAliases extends T // See PIG-1434 @Test public void testScalarAliasesSplitClause() throws Exception{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); String[] input = { "1\t5", "2\t10", @@ -487,7 +482,10 @@ public class TestScalarAliases extends T assertTrue(t.toString().equals("(3,20.0)")); assertFalse(iter.hasNext()); + Util.deleteFile(cluster, "table_testScalarAliasesSplitClauseDir"); + } + // See PIG-1434 @Test @@ -498,7 +496,7 @@ public class TestScalarAliases extends T "3\t20" }; - Util.createInputFile(cluster, "table_testScalarAliasesGrammar", input); + Util.createLocalInputFile( "table_testScalarAliasesGrammar", input); pigServer.registerQuery("A = LOAD 'table_testScalarAliasesGrammar' as (a0: long, a1: double);"); pigServer.registerQuery("B = group A all;"); pigServer.registerQuery("C = foreach B generate COUNT(A);");