Author: daijy Date: Fri Aug 6 17:24:52 2010 New Revision: 983062 URL: http://svn.apache.org/viewvc?rev=983062&view=rev Log: PIG-1496: Mandatory rule ImplicitSplitInserter
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=983062&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java Fri Aug 6 17:24:52 2010 @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.rules; + +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; + +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ConstantExpression; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.newplan.optimizer.Transformer; +import org.apache.pig.newplan.logical.relational.LOSplit; +import org.apache.pig.newplan.logical.relational.LOStore; +import org.apache.pig.newplan.logical.relational.LOSplitOutput; +import org.apache.pig.newplan.logical.relational.LogicalSchema; + + +/** + * Super class for all rules that operates on the whole plan. It doesn't look for + * a specific pattern. An example of such kind rule is ColumnPrune. + * + */ +public class ImplicitSplitInserter extends Rule { + + public ImplicitSplitInserter(String n) { + super(n, true); + } + + @Override + public List<OperatorPlan> match(OperatorPlan plan) throws FrontendException { + // Look to see if this is a non-split node with two outputs. If so + // it matches. + currentPlan = plan; + List<OperatorPlan> ll = new ArrayList<OperatorPlan>(); + Iterator<Operator> ops = plan.getOperators(); + while (ops.hasNext()) { + Operator op = ops.next(); + if (op instanceof LOSplit || op instanceof LOStore) + continue; + List<Operator> succs = plan.getSuccessors(op); + if (succs != null && succs.size() >= 2) { + OperatorPlan match = new LogicalPlan(); + match.add(op); + ll.add(match); + } + } + return ll; + } + + @Override + public Transformer getNewTransformer() { + return new ImplicitSplitInserterTransformer(); + } + + public class ImplicitSplitInserterTransformer extends Transformer { + @Override + public boolean check(OperatorPlan matched) throws FrontendException { + return true; + } + + @Override + public void transform(OperatorPlan matched) throws FrontendException { + if (matched == null || matched instanceof LOSplit || matched instanceof LOStore + || matched.size() != 1) + throw new FrontendException("Invalid match in ImplicitSplitInserter rule.", 2244); + + // For two successors of op here is a pictorial + // representation of the change required: + // BEFORE: + // Succ1 Succ2 + // \ / + // op + + // SHOULD BECOME: + + // AFTER: + // Succ1 Succ2 + // | | + // SplitOutput SplitOutput + // \ / + // Split + // | + // op + + Operator op = matched.getSources().get(0); + List<Operator> succs = currentPlan.getSuccessors(op); + if (succs == null || succs.size() < 2) + throw new FrontendException("Invalid match in ImplicitSplitInserter rule.", 2243); + LOSplit splitOp = new LOSplit(currentPlan); + splitOp.setAlias(((LogicalRelationalOperator) op).getAlias()); + Operator[] sucs = succs.toArray(new Operator[0]); + currentPlan.add(splitOp); + currentPlan.connect(op, splitOp); + for (Operator suc : sucs) { + // position is remembered in order to maintain the order of the successors + Pair<Integer, Integer> pos = currentPlan.disconnect(op, suc); + LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); + LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BOOLEAN); + new ConstantExpression(filterPlan, Boolean.valueOf(true), fs); + LOSplitOutput splitOutput = new LOSplitOutput((LogicalPlan) currentPlan, filterPlan); + splitOutput.setAlias(splitOp.getAlias()); + currentPlan.add(splitOutput); + currentPlan.connect(splitOp, splitOutput); + currentPlan.connect(splitOutput, pos.first, suc, pos.second); + } + } + + @Override + public OperatorPlan reportChanges() { + return currentPlan; + } + } + + @Override + protected OperatorPlan buildPattern() { + return null; + } +} Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java?rev=983062&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java Fri Aug 6 17:24:52 2010 @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + + +import static org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector; +import static org.apache.pig.test.utils.TypeCheckingTestUtil.printTypeGraph; +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.PlanSetter; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator; +import org.apache.pig.impl.plan.CompilationMessageCollector; +import org.apache.pig.test.utils.LogicalPlanTester; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +public class TestNewPlanImplicitSplit { + private PigServer pigServer; + static MiniCluster cluster = MiniCluster.buildCluster(); + + @Before + public void setUp() throws Exception { + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + pigServer.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "true"); + } + + @After + public void tearDown() throws Exception { + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + cluster.shutDown(); + } + + @Test + public void testImplicitSplit() throws Exception{ + int LOOP_SIZE = 20; + String[] input = new String[LOOP_SIZE]; + for(int i = 1; i <= LOOP_SIZE; i++) { + input[i-1] = Integer.toString(i); + } + String inputFileName = "testImplicitSplit-input.txt"; + Util.createInputFile(cluster, inputFileName, input); + pigServer.registerQuery("A = LOAD '" + inputFileName + "';"); + pigServer.registerQuery("B = filter A by $0<=10;"); + pigServer.registerQuery("C = filter A by $0>10;"); + pigServer.registerQuery("D = union B,C;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + if(!iter.hasNext()) fail("No Output received"); + int cnt = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + ++cnt; + } + assertEquals(20, cnt); + Util.deleteFile(cluster, inputFileName); + } + + @Test + public void testImplicitSplitInCoGroup() throws Exception { + // this query is similar to the one reported in JIRA - PIG-537 + // Create input file + String input1 = "testImplicitSplitInCoGroup-input1.txt"; + String input2 = "testImplicitSplitInCoGroup-input2.txt"; + Util.createInputFile(cluster, input1, + new String[] {"a:1", "b:2", "b:20", "c:3", "c:30"}); + Util.createInputFile(cluster, input2, + new String[] {"a:first", "b:second", "c:third"}); + pigServer.registerQuery("a = load '" + input1 + + "' using PigStorage(':') as (name:chararray, marks:int);"); + pigServer.registerQuery("b = load '" + input2 + + "' using PigStorage(':') as (name:chararray, rank:chararray);"); + pigServer.registerQuery("c = cogroup a by name, b by name;"); + pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;"); + pigServer.registerQuery("e = cogroup a by marks, d by newmarks;"); + pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);"); + HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>(); + results.put(1, new Object[] { "a", 1, "a", 1 }); + results.put(2, new Object[] { "b", 2, "b", 2 }); + results.put(3, new Object[] { "c", 3, "c", 3 }); + results.put(20, new Object[] { "b", 20, "b", 20 }); + results.put(30, new Object[] { "c", 30, "c", 30 }); + pigServer.explain("f", System.out); + + Iterator<Tuple> it = pigServer.openIterator("f"); + while(it.hasNext()) { + Tuple t = it.next(); + System.err.println("Tuple:" + t); + Integer group = (Integer)t.get(0); + Object[] groupValues = results.get(group); + for(int i = 0; i < 4; i++) { + assertEquals(groupValues[i], t.get(i+1)); + } + } + Util.deleteFile(cluster, input1); + Util.deleteFile(cluster, input2); + } + + @Test + public void testImplicitSplitInCoGroup2() throws Exception { + // this query is similar to the one reported in JIRA - PIG-537 + LogicalPlanTester planTester = new LogicalPlanTester(); + planTester.buildPlan("a = load 'file1' using PigStorage(':') as (name:chararray, marks:int);"); + planTester.buildPlan("b = load 'file2' using PigStorage(':') as (name:chararray, rank:chararray);"); + planTester.buildPlan("c = cogroup a by name, b by name;"); + planTester.buildPlan("d = foreach c generate group, FLATTEN(a.marks) as newmarks;"); + planTester.buildPlan("e = cogroup a by marks, d by newmarks;"); + LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, flatten(a), flatten(d);"); + + // Set the logical plan values correctly in all the operators + PlanSetter ps = new PlanSetter(plan); + ps.visit(); + + // run through validator + CompilationMessageCollector collector = new CompilationMessageCollector() ; + TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; + typeValidator.validate(plan, collector) ; + printMessageCollector(collector) ; + printTypeGraph(plan) ; + + if (collector.hasError()) { + throw new Exception("Error during type checking") ; + } + + // this will run ImplicitSplitInserter + TestLogicalOptimizer.optimizePlan(plan); + + // get Schema of leaf and compare: + Schema expectedSchema = Util.getSchemaFromString("grp: int,A::username: chararray,A::marks: int,AB::group: chararray,AB::newmarks: int"); + assertTrue(Schema.equals(expectedSchema, plan.getLeaves().get(0).getSchema(),false, true)); + } +}