Author: gates
Date: Tue Jan 19 19:39:51 2010
New Revision: 900926
URL: http://svn.apache.org/viewvc?rev=900926&view=rev
Log:
PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,
FORACH. Checking in for Pradeep since he's out.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCast.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=900926&r1=900925&r2=900926&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 19 19:39:51 2010
@@ -78,6 +78,9 @@
BUG FIXES
+PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,
+ FORACH (pradeepkth via gates)
+
PIG-1171: Top-N queries produce incorrect results when followed by a cross
statement (rding via olgan)
PIG-1159: merge join right side table does not support comma seperated paths
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=900926&r1=900925&r2=900926&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCast.java Tue Jan
19 19:39:51 2010
@@ -26,6 +26,7 @@
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.data.DataType;
public class LOCast extends ExpressionOperator {
@@ -34,6 +35,11 @@
private static final long serialVersionUID = 2L;
private FuncSpec mLoadFuncSpec = null;
+ // store field schema representing the schema
+ // in user specified casts -this is so that if
+ // field schema is unset and then getFieldSchema is called we still
+ // rebuild the fieldschema correctly as specified by the user in the script
+ private FieldSchema userSpecifiedFieldSchema;
/**
*
@@ -65,11 +71,22 @@
public Schema getSchema() {
return mSchema;
}
+
+
+ @Override
+ public void setFieldSchema(FieldSchema fs) throws FrontendException {
+ super.setFieldSchema(fs);
+ userSpecifiedFieldSchema = new Schema.FieldSchema(fs);
+ }
@Override
public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- mFieldSchema = new Schema.FieldSchema(null, mType);
+ if(userSpecifiedFieldSchema != null) {
+ mFieldSchema = userSpecifiedFieldSchema;
+ } else {
+ mFieldSchema = new Schema.FieldSchema(null, mType);
+ }
Schema.FieldSchema parFs = getExpression().getFieldSchema();
String canonicalName = (parFs != null ? parFs.canonicalName :
null);
mFieldSchema.setParent(canonicalName, getExpression());
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=900926&r1=900925&r2=900926&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
Tue Jan 19 19:39:51 2010
@@ -37,6 +37,7 @@
* the logical binary expression operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(BinaryExpressionOperator binOp)
throws VisitorException {
binOp.unsetFieldSchema();
@@ -49,6 +50,7 @@
* the logical unary operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(UnaryExpressionOperator uniOp) throws
VisitorException {
uniOp.unsetFieldSchema();
super.visit(uniOp);
@@ -60,6 +62,7 @@
* the logical cogroup operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOCogroup cg) throws VisitorException {
cg.unsetSchema();
super.visit(cg);
@@ -71,6 +74,7 @@
* the logical sort operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOSort s) throws VisitorException {
s.unsetSchema();
super.visit(s);
@@ -82,6 +86,7 @@
* the logical limit operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOLimit limit) throws VisitorException {
limit.unsetSchema();
super.visit(limit);
@@ -94,6 +99,7 @@
* the logical filter operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOFilter filter) throws VisitorException {
filter.unsetSchema();
super.visit(filter);
@@ -105,6 +111,7 @@
* the logical split operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOSplit split) throws VisitorException {
split.unsetSchema();
super.visit(split);
@@ -116,6 +123,7 @@
* the logical foreach operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOForEach forEach) throws VisitorException {
forEach.unsetSchema();
super.visit(forEach);
@@ -128,6 +136,7 @@
* the user defined function
* @throws VisitorException
*/
+ @Override
protected void visit(LOUserFunc func) throws VisitorException {
func.unsetFieldSchema();
super.visit(func);
@@ -138,6 +147,7 @@
* the logical binCond operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOBinCond binCond) throws VisitorException {
binCond.unsetFieldSchema();
super.visit(binCond);
@@ -149,64 +159,185 @@
* the logical cast operator that has to be visited
* @throws VisitorException
*/
+ @Override
protected void visit(LOCast cast) throws VisitorException {
+ cast.unsetFieldSchema();
super.visit(cast);
}
+
/**
*
* @param regexp
* the logical regexp operator that has to be visited
* @throws ParseException
*/
+ @Override
protected void visit(LORegexp regexp) throws VisitorException {
regexp.unsetFieldSchema();
super.visit(regexp);
}
+ @Override
protected void visit(LOLoad load) throws VisitorException{
// Don't remove load's schema, it's not like it will change. And we
// don't have a way to recover it.
super.visit(load);
}
+ @Override
protected void visit(LOStore store) throws VisitorException{
store.unsetSchema();
super.visit(store);
}
+ @Override
protected void visit(LOConst c) throws VisitorException{
c.unsetSchema();
super.visit(c);
}
+ @Override
protected void visit(LOUnion u) throws VisitorException {
u.unsetSchema();
super.visit(u);
}
+ @Override
protected void visit(LOSplitOutput sop) throws VisitorException {
sop.unsetSchema();
super.visit(sop);
}
+ @Override
protected void visit(LODistinct dt) throws VisitorException {
dt.unsetSchema();
super.visit(dt);
}
+ @Override
protected void visit(LOCross cs) throws VisitorException {
cs.unsetSchema();
super.visit(cs);
}
+ @Override
protected void visit(LOProject project) throws VisitorException {
project.unsetFieldSchema();
super.visit(project);
}
+ @Override
protected void visit(LOJoin join) throws VisitorException {
join.unsetSchema();
super.visit(join);
}
+
+ @Override
+ protected void visit(ExpressionOperator op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOAdd op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOAnd binOp) throws VisitorException {
+ binOp.unsetFieldSchema();
+ super.visit(binOp);
+ }
+
+ @Override
+ public void visit(LODivide op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOEqual op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOGreaterThan op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOGreaterThanEqual op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOIsNull uniOp) throws VisitorException {
+ uniOp.unsetFieldSchema();
+ super.visit(uniOp);
+ }
+
+ @Override
+ public void visit(LOLesserThan op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOLesserThanEqual op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOMapLookup op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOMod op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOMultiply op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LONegative op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LONot uniOp) throws VisitorException {
+ uniOp.unsetFieldSchema();
+ super.visit(uniOp);
+ }
+
+ @Override
+ public void visit(LONotEqual op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
+
+ @Override
+ public void visit(LOOr binOp) throws VisitorException {
+ binOp.unsetFieldSchema();
+ super.visit(binOp);
+ }
+
+ @Override
+ public void visit(LOSubtract op) throws VisitorException {
+ op.unsetFieldSchema();
+ super.visit(op);
+ }
}
Modified:
hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=900926&r1=900925&r2=900926&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
Tue Jan 19 19:39:51 2010
@@ -5487,6 +5487,79 @@
LogicalPlan plan = planTester.buildPlan("c = foreach b generate
(chararray)viewinfo#'pos' as position;") ;
// validate
+ runTypeCheckingValidator(plan);
+
+ checkLoaderInCasts(plan, "org.apache.pig.builtin.PigStorage");
+ }
+
+ /**
+ * test various scenarios with two level map lookup
+ */
+ @Test
+ public void testTwolevelMapLookupLineage() throws Exception {
+ List<String[]> queries = new ArrayList<String[]>();
+ // CASE 1: LOAD -> FILTER -> FOREACH -> LIMIT -> STORE
+ queries.add(new String[] {"sds = LOAD '/my/data/location' " +
+ "AS (simpleFields:map[], mapFields:map[],
listMapFields:map[]);",
+ "queries = FILTER sds BY
mapFields#'page_params'#'query' " +
+ "is NOT NULL;",
+ "queries_rand = FOREACH queries GENERATE " +
+ "(CHARARRAY) (mapFields#'page_params'#'query') AS
query_string;",
+ "queries_limit = LIMIT queries_rand 100;",
+ "STORE queries_limit INTO 'out';"});
+ // CASE 2: LOAD -> FOREACH -> FILTER -> LIMIT -> STORE
+ queries.add(new String[]{"sds = LOAD '/my/data/location' " +
+ "AS (simpleFields:map[], mapFields:map[],
listMapFields:map[]);",
+ "queries_rand = FOREACH sds GENERATE " +
+ "(CHARARRAY) (mapFields#'page_params'#'query') AS
query_string;",
+ "queries = FILTER queries_rand BY query_string IS NOT
null;",
+ "queries_limit = LIMIT queries 100;",
+ "STORE queries_limit INTO 'out';"});
+ // CASE 3: LOAD -> FOREACH -> FOREACH -> FILTER -> LIMIT -> STORE
+ queries.add(new String[]{"sds = LOAD '/my/data/location' " +
+ "AS (simpleFields:map[], mapFields:map[],
listMapFields:map[]);",
+ "params = FOREACH sds GENERATE " +
+ "(map[]) (mapFields#'page_params') AS params;",
+ "queries = FOREACH params " +
+ "GENERATE (CHARARRAY) (params#'query') AS
query_string;",
+ "queries_filtered = FILTER queries BY query_string IS
NOT null;",
+ "queries_limit = LIMIT queries_filtered 100;",
+ "STORE queries_limit INTO 'out';"});
+ // CASE 4: LOAD -> FOREACH -> FOREACH -> LIMIT -> STORE
+ queries.add(new String[]{"sds = LOAD '/my/data/location' " +
+ "AS (simpleFields:map[], mapFields:map[],
listMapFields:map[]);",
+ "params = FOREACH sds GENERATE" +
+ " (map[]) (mapFields#'page_params') AS params;",
+ "queries = FOREACH params GENERATE " +
+ "(CHARARRAY) (params#'query') AS query_string;",
+ "queries_limit = LIMIT queries 100;",
+ "STORE queries_limit INTO 'out';"});
+ // CASE 5: LOAD -> FOREACH -> FOREACH -> FOREACH -> LIMIT -> STORE
+ queries.add(new String[]{"sds = LOAD '/my/data/location' " +
+ "AS (simpleFields:map[], mapFields:map[],
listMapFields:map[]);",
+ "params = FOREACH sds GENERATE " +
+ "(map[]) (mapFields#'page_params') AS params;",
+ "queries = FOREACH params GENERATE " +
+ "(CHARARRAY) (params#'query') AS query_string;",
+ "rand_queries = FOREACH queries GENERATE query_string as
query;",
+ "queries_limit = LIMIT rand_queries 100;",
+ "STORE rand_queries INTO 'out';"});
+
+ for (String[] query: queries) {
+ LogicalPlan lp = null;
+ for (String queryLine : query) {
+ lp = planTester.buildPlan(queryLine);
+ }
+
+ // validate
+ runTypeCheckingValidator(lp);
+ checkLoaderInCasts(lp, "org.apache.pig.builtin.PigStorage");
+
+ }
+ }
+
+ private void runTypeCheckingValidator(LogicalPlan plan) throws
+ PlanValidationException {
CompilationMessageCollector collector = new
CompilationMessageCollector() ;
TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
typeValidator.validate(plan, collector) ;
@@ -5498,12 +5571,16 @@
if (collector.hasError()) {
throw new AssertionError("Expect no error") ;
}
-
+ }
+
+ private void checkLoaderInCasts(LogicalPlan plan, String loaderClassName)
+ throws VisitorException {
CastFinder cf = new CastFinder(plan);
cf.visit();
List<LOCast> casts = cf.casts;
for (LOCast cast : casts) {
-
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith(
+ loaderClassName));
}
}
@@ -5662,6 +5739,7 @@
*/
public static class TestBinCondFieldSchema extends EvalFunc<DataBag> {
//no-op exec method
+ @Override
public DataBag exec(Tuple input) {
return null;
}