Author: gates Date: Wed Jan 20 18:34:41 2010 New Revision: 901317 URL: http://svn.apache.org/viewvc?rev=901317&view=rev Log: PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER, FORACH. Checking in for Pradeep since he is out.
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=901317&r1=901316&r2=901317&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Jan 20 18:34:41 2010 @@ -145,6 +145,9 @@ BUG FIXES +PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER, +FORACH (pradeepkth via gates) + PIG-1143: Poisson Sample Loader should compute the number of samples required only once (sriranjan via olgan) Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=901317&r1=901316&r2=901317&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java Wed Jan 20 18:34:41 2010 @@ -26,6 +26,7 @@ import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.data.DataType; public class LOCast extends ExpressionOperator { @@ -34,6 +35,11 @@ private static final long serialVersionUID = 2L; private FuncSpec mLoadFuncSpec = null; + // store field schema representing the schema + // in user specified casts -this is so that if + // field schema is unset and then getFieldSchema is called we still + // rebuild the fieldschema correctly as specified by the user in the script + private FieldSchema userSpecifiedFieldSchema; /** * @@ -65,11 +71,22 @@ public Schema getSchema() { return mSchema; } + + + @Override + public void setFieldSchema(FieldSchema fs) throws FrontendException { + super.setFieldSchema(fs); + userSpecifiedFieldSchema = new Schema.FieldSchema(fs); + } @Override public Schema.FieldSchema getFieldSchema() throws FrontendException { if(!mIsFieldSchemaComputed) { - mFieldSchema = new Schema.FieldSchema(null, mType); + if(userSpecifiedFieldSchema != null) { + mFieldSchema = userSpecifiedFieldSchema; + } else { + mFieldSchema = new Schema.FieldSchema(null, mType); + } Schema.FieldSchema parFs = getExpression().getFieldSchema(); String canonicalName = (parFs != null ? parFs.canonicalName : null); mFieldSchema.setParent(canonicalName, getExpression()); Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=901317&r1=901316&r2=901317&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Wed Jan 20 18:34:41 2010 @@ -37,6 +37,7 @@ * the logical binary expression operator that has to be visited * @throws VisitorException */ + @Override protected void visit(BinaryExpressionOperator binOp) throws VisitorException { binOp.unsetFieldSchema(); @@ -49,6 +50,7 @@ * the logical unary operator that has to be visited * @throws VisitorException */ + @Override protected void visit(UnaryExpressionOperator uniOp) throws VisitorException { uniOp.unsetFieldSchema(); super.visit(uniOp); @@ -60,6 +62,7 @@ * the logical cogroup operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOCogroup cg) throws VisitorException { cg.unsetSchema(); super.visit(cg); @@ -71,6 +74,7 @@ * the logical sort operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOSort s) throws VisitorException { s.unsetSchema(); super.visit(s); @@ -82,6 +86,7 @@ * the logical limit operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOLimit limit) throws VisitorException { limit.unsetSchema(); super.visit(limit); @@ -94,6 +99,7 @@ * the logical filter operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOFilter filter) throws VisitorException { filter.unsetSchema(); super.visit(filter); @@ -105,6 +111,7 @@ * the logical split operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOSplit split) throws VisitorException { split.unsetSchema(); super.visit(split); @@ -116,6 +123,7 @@ * the logical foreach operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOForEach forEach) throws VisitorException { forEach.unsetSchema(); super.visit(forEach); @@ -128,6 +136,7 @@ * the user defined function * @throws VisitorException */ + @Override protected void visit(LOUserFunc func) throws VisitorException { func.unsetFieldSchema(); super.visit(func); @@ -138,6 +147,7 @@ * the logical binCond operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOBinCond binCond) throws VisitorException { binCond.unsetFieldSchema(); super.visit(binCond); @@ -149,64 +159,185 @@ * the logical cast operator that has to be visited * @throws VisitorException */ + @Override protected void visit(LOCast cast) throws VisitorException { + cast.unsetFieldSchema(); super.visit(cast); } + /** * * @param regexp * the logical regexp operator that has to be visited * @throws ParseException */ + @Override protected void visit(LORegexp regexp) throws VisitorException { regexp.unsetFieldSchema(); super.visit(regexp); } + @Override protected void visit(LOLoad load) throws VisitorException{ // Don't remove load's schema, it's not like it will change. And we // don't have a way to recover it. super.visit(load); } + @Override protected void visit(LOStore store) throws VisitorException{ store.unsetSchema(); super.visit(store); } + @Override protected void visit(LOConst c) throws VisitorException{ c.unsetSchema(); super.visit(c); } + @Override protected void visit(LOUnion u) throws VisitorException { u.unsetSchema(); super.visit(u); } + @Override protected void visit(LOSplitOutput sop) throws VisitorException { sop.unsetSchema(); super.visit(sop); } + @Override protected void visit(LODistinct dt) throws VisitorException { dt.unsetSchema(); super.visit(dt); } + @Override protected void visit(LOCross cs) throws VisitorException { cs.unsetSchema(); super.visit(cs); } + @Override protected void visit(LOProject project) throws VisitorException { project.unsetFieldSchema(); super.visit(project); } + @Override protected void visit(LOJoin join) throws VisitorException { join.unsetSchema(); super.visit(join); } + + @Override + protected void visit(ExpressionOperator op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOAdd op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOAnd binOp) throws VisitorException { + binOp.unsetFieldSchema(); + super.visit(binOp); + } + + @Override + public void visit(LODivide op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOEqual op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOGreaterThan op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOGreaterThanEqual op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOIsNull uniOp) throws VisitorException { + uniOp.unsetFieldSchema(); + super.visit(uniOp); + } + + @Override + public void visit(LOLesserThan op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOLesserThanEqual op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOMapLookup op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOMod op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOMultiply op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LONegative op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LONot uniOp) throws VisitorException { + uniOp.unsetFieldSchema(); + super.visit(uniOp); + } + + @Override + public void visit(LONotEqual op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } + + @Override + public void visit(LOOr binOp) throws VisitorException { + binOp.unsetFieldSchema(); + super.visit(binOp); + } + + @Override + public void visit(LOSubtract op) throws VisitorException { + op.unsetFieldSchema(); + super.visit(op); + } } Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=901317&r1=901316&r2=901317&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java (original) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java Wed Jan 20 18:34:41 2010 @@ -5533,6 +5533,79 @@ LogicalPlan plan = planTester.buildPlan("c = foreach b generate (chararray)viewinfo#'pos' as position;") ; // validate + runTypeCheckingValidator(plan); + + checkLoaderInCasts(plan, "org.apache.pig.builtin.PigStorage"); + } + + /** + * test various scenarios with two level map lookup + */ + @Test + public void testTwolevelMapLookupLineage() throws Exception { + List<String[]> queries = new ArrayList<String[]>(); + // CASE 1: LOAD -> FILTER -> FOREACH -> LIMIT -> STORE + queries.add(new String[] {"sds = LOAD '/my/data/location' " + + "AS (simpleFields:map[], mapFields:map[], listMapFields:map[]);", + "queries = FILTER sds BY mapFields#'page_params'#'query' " + + "is NOT NULL;", + "queries_rand = FOREACH queries GENERATE " + + "(CHARARRAY) (mapFields#'page_params'#'query') AS query_string;", + "queries_limit = LIMIT queries_rand 100;", + "STORE queries_limit INTO 'out';"}); + // CASE 2: LOAD -> FOREACH -> FILTER -> LIMIT -> STORE + queries.add(new String[]{"sds = LOAD '/my/data/location' " + + "AS (simpleFields:map[], mapFields:map[], listMapFields:map[]);", + "queries_rand = FOREACH sds GENERATE " + + "(CHARARRAY) (mapFields#'page_params'#'query') AS query_string;", + "queries = FILTER queries_rand BY query_string IS NOT null;", + "queries_limit = LIMIT queries 100;", + "STORE queries_limit INTO 'out';"}); + // CASE 3: LOAD -> FOREACH -> FOREACH -> FILTER -> LIMIT -> STORE + queries.add(new String[]{"sds = LOAD '/my/data/location' " + + "AS (simpleFields:map[], mapFields:map[], listMapFields:map[]);", + "params = FOREACH sds GENERATE " + + "(map[]) (mapFields#'page_params') AS params;", + "queries = FOREACH params " + + "GENERATE (CHARARRAY) (params#'query') AS query_string;", + "queries_filtered = FILTER queries BY query_string IS NOT null;", + "queries_limit = LIMIT queries_filtered 100;", + "STORE queries_limit INTO 'out';"}); + // CASE 4: LOAD -> FOREACH -> FOREACH -> LIMIT -> STORE + queries.add(new String[]{"sds = LOAD '/my/data/location' " + + "AS (simpleFields:map[], mapFields:map[], listMapFields:map[]);", + "params = FOREACH sds GENERATE" + + " (map[]) (mapFields#'page_params') AS params;", + "queries = FOREACH params GENERATE " + + "(CHARARRAY) (params#'query') AS query_string;", + "queries_limit = LIMIT queries 100;", + "STORE queries_limit INTO 'out';"}); + // CASE 5: LOAD -> FOREACH -> FOREACH -> FOREACH -> LIMIT -> STORE + queries.add(new String[]{"sds = LOAD '/my/data/location' " + + "AS (simpleFields:map[], mapFields:map[], listMapFields:map[]);", + "params = FOREACH sds GENERATE " + + "(map[]) (mapFields#'page_params') AS params;", + "queries = FOREACH params GENERATE " + + "(CHARARRAY) (params#'query') AS query_string;", + "rand_queries = FOREACH queries GENERATE query_string as query;", + "queries_limit = LIMIT rand_queries 100;", + "STORE rand_queries INTO 'out';"}); + + for (String[] query: queries) { + LogicalPlan lp = null; + for (String queryLine : query) { + lp = planTester.buildPlan(queryLine); + } + + // validate + runTypeCheckingValidator(lp); + checkLoaderInCasts(lp, "org.apache.pig.builtin.PigStorage"); + + } + } + + private void runTypeCheckingValidator(LogicalPlan plan) throws + PlanValidationException { CompilationMessageCollector collector = new CompilationMessageCollector() ; TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; typeValidator.validate(plan, collector) ; @@ -5544,12 +5617,16 @@ if (collector.hasError()) { throw new AssertionError("Expect no error") ; } - + } + + private void checkLoaderInCasts(LogicalPlan plan, String loaderClassName) + throws VisitorException { CastFinder cf = new CastFinder(plan); cf.visit(); List<LOCast> casts = cf.casts; for (LOCast cast : casts) { - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); + assertTrue(cast.getLoadFuncSpec().getClassName().startsWith( + loaderClassName)); } } @@ -5708,6 +5785,7 @@ */ public static class TestBinCondFieldSchema extends EvalFunc<DataBag> { //no-op exec method + @Override public DataBag exec(Tuple input) { return null; }