Author: gates Date: Tue Dec 1 21:03:37 2009 New Revision: 885929 URL: http://svn.apache.org/viewvc?rev=885929&view=rev Log: PIG-990 Provide a way to pin LogicalOperator Options.
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Dec 1 21:03:37 2009 @@ -24,6 +24,8 @@ IMPROVEMENTS +PIG-990: Provide a way to pin LogicalOperator Options (dvryaboy via gates) + PIG-1103: refactoring of commit tests (olgan) PIG-1101: Allow arugment to limit to be long in addition to int (ashutoshc via Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Tue Dec 1 21:03:37 2009 @@ -969,7 +969,7 @@ logToPhyMap.put(loj, smj); return; } - else if (loj.getJoinType() == LOJoin.JOINTYPE.REGULAR){ + else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){ POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( scope, nodeGen.getNextNodeId(scope)), loj .getRequestedParallelism()); Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue Dec 1 21:03:37 2009 @@ -64,6 +64,11 @@ private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans; private GROUPTYPE mGroupType; + /** + * static constant to refer to the option of selecting a group type + */ + public final static Integer OPTION_GROUPTYPE = 1; + /** * * @param plan Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Tue Dec 1 21:03:37 2009 @@ -49,12 +49,13 @@ * Enum for the type of join */ public static enum JOINTYPE { - REGULAR, // Regular join + HASH, // Hash Join REPLICATED, // Fragment Replicated join SKEWED, // Skewed Join MERGE // Sort Merge Join }; + /** * LOJoin contains a list of logical operators corresponding to the * relational operators and a list of generates for each relational @@ -66,7 +67,12 @@ private boolean[] mInnerFlags; private JOINTYPE mJoinType; // Retains the type of the join - /** + /** + * static constant to refer to the option of selecting a join type + */ + public final static Integer OPTION_JOIN = 1; + + /** * * @param plan * LogicalPlan this operator is a part of. Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Dec 1 21:03:37 2009 @@ -19,6 +19,7 @@ package org.apache.pig.impl.logicalLayer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.io.IOException; @@ -83,6 +84,13 @@ */ protected boolean mIsProjectionMapComputed = false; + /** + * A HashSet to indicate whether an option (such a Join Type) was pinned + * by the user or can be chosen at runtime by the optimizer. + */ + protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>(); + + private static Log log = LogFactory.getLog(LogicalOperator.class); /** @@ -218,6 +226,14 @@ mRequestedParallelism = newRequestedParallelism; } + public void pinOption(Integer opt) { + mPinnedOptions.add(opt); + } + + public boolean isPinnedOption(Integer opt) { + return mPinnedOptions.contains(opt); + } + @Override public String toString() { StringBuffer msg = new StringBuffer(); Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=885929&r1=885928&r2=885929&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Dec 1 21:03:37 2009 @@ -1476,10 +1476,10 @@ { ( input = NestedExpr(lp) {log.debug("Limit input: " + input);} - ( - t = <INTEGER> { l = Long.parseLong(t.image); } + ( + t = <INTEGER> { l = Long.parseLong(t.image); } | t = <LONGINTEGER> { l = Long.parseLong(t.image.substring(0, t.image.length() - 1)); } - ) + ) ) { LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, getNextId()), l); @@ -1748,6 +1748,11 @@ throw new ParseException("Collected group is only supported for columns or star projection"); } cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED); + cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE); + } + |"\"regular\"" { + cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR); + cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE); } ) ] @@ -2044,9 +2049,7 @@ ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); log.trace("Entering JoinClause"); log.debug("LogicalPlan: " + lp); - LogicalOperator frj = null; - LogicalOperator skj = null; - LogicalOperator smj = null; + LogicalOperator joinOp = null; boolean isLeftOuter = false; boolean isRightOuter = false; boolean isFullOuter = false; @@ -2104,39 +2107,45 @@ if(isFullOuter || isRightOuter) { throw new ParseException("Replicated join does not support (right|full) outer joins"); } - frj = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); + joinOp.pinOption(LOJoin.OPTION_JOIN); } - | "\"repl\"" { - if(isFullOuter || isRightOuter) { + | "\"repl\"" { + if(isFullOuter || isRightOuter) { throw new ParseException("Replicated join does not support (right|full) outer joins"); } - frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); - } + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); + joinOp.pinOption(LOJoin.OPTION_JOIN); + } |"\"skewed\"" { - skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); + joinOp.pinOption(LOJoin.OPTION_JOIN); } |"\"merge\"" { if(isOuter) { throw new ParseException("Merge join does not support (left|right|full) outer joins"); } - smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); + joinOp.pinOption(LOJoin.OPTION_JOIN); + } + |"\"hash\"" { + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH); + joinOp.pinOption(LOJoin.OPTION_JOIN); + } + |"\"default\"" { + joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH); + joinOp.pinOption(LOJoin.OPTION_JOIN); }) ] ) ) {log.trace("Exiting JoinClause"); - if (frj!=null) { - return frj; - } - else if (skj!=null) { - return skj; + if (joinOp!=null) { + return joinOp; } - else if (smj!=null) { - return smj; - } else { - return parseJoin(gis, lp, LOJoin.JOINTYPE.REGULAR); + return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH); }} } Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java?rev=885929&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java Tue Dec 1 21:03:37 2009 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.io.IOException; + +import org.apache.pig.PigServer; +import org.apache.pig.impl.logicalLayer.LOCogroup; +import org.apache.pig.impl.logicalLayer.LOJoin; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LOJoin; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.test.utils.*; +import org.junit.Before; +import org.junit.Test; +import static org.apache.pig.ExecType.LOCAL; + +import junit.framework.TestCase; + +public class TestPinOptions extends TestCase { + + protected PigServer pigServer; + @Before + @Override + protected void setUp() throws Exception { + pigServer = new PigServer(LOCAL); + } + + @Test + public void testPinnedJoinOption() throws IOException { + String[] joinTypes = {"hash", "repl", "merge", "skewed", "default"}; + String[] expectedJoinTypes = {"HASH", "REPLICATED", "MERGE", "SKEWED", "HASH"}; + for (int i = 0; i < joinTypes.length; i++) { + pigServer.setBatchOn(); + pigServer.registerQuery("a = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("b = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("c = join a by foo, b by foo using \""+joinTypes[i]+"\";"); + LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"), "c"); + assertTrue("did "+joinTypes[i]+" join get pinned? ", + op.isPinnedOption(LOJoin.OPTION_JOIN)); + assertEquals("did the right join type get set? ", + ((LOJoin) op).getJoinType().toString(), expectedJoinTypes[i]); + pigServer.discardBatch(); + } + } + + @Test + public void testNotPinnedJinOption() throws IOException { + pigServer.registerQuery("a = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("b = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("c = join a by foo, b by foo;"); + LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"), "c"); + assertEquals("default join should be hash", + ((LOJoin) op).getJoinType().toString(), "HASH"); + assertFalse(op.isPinnedOption(LOJoin.OPTION_JOIN)); + } + + @Test + public void testGroupOptions() throws IOException { + pigServer.setBatchOn(); + pigServer.registerQuery("a = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("b = group a by foo;"); + + LogicalOperator op = getOpByAlias(pigServer.getAliases().get("b"), "b"); + assertFalse(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE)); + pigServer.discardBatch(); + + pigServer.setBatchOn(); + pigServer.registerQuery("a = load '/tmp' as (foo, bar);"); + pigServer.registerQuery("b = group a by foo using \"collected\";"); + op = getOpByAlias(pigServer.getAliases().get("b"), "b"); + assertTrue(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE)); + pigServer.discardBatch(); + } + + private LogicalOperator getOpByAlias(LogicalPlan lp, String alias) { + for (LogicalOperator op : lp) { + if (op.getAlias().equals(alias)) return op; + } + return null; + } + +}