Author: gates
Date: Tue Dec 1 21:03:37 2009
New Revision: 885929
URL: http://svn.apache.org/viewvc?rev=885929&view=rev
Log:
PIG-990 Provide a way to pin LogicalOperator Options.
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Dec 1 21:03:37 2009
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-990: Provide a way to pin LogicalOperator Options (dvryaboy via gates)
+
PIG-1103: refactoring of commit tests (olgan)
PIG-1101: Allow arugment to limit to be long in addition to int (ashutoshc via
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
Tue Dec 1 21:03:37 2009
@@ -969,7 +969,7 @@
logToPhyMap.put(loj, smj);
return;
}
- else if (loj.getJoinType() == LOJoin.JOINTYPE.REGULAR){
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
POGlobalRearrange poGlobal = new POGlobalRearrange(new
OperatorKey(
scope, nodeGen.getNextNodeId(scope)), loj
.getRequestedParallelism());
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue
Dec 1 21:03:37 2009
@@ -64,6 +64,11 @@
private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans;
private GROUPTYPE mGroupType;
+ /**
+ * static constant to refer to the option of selecting a group type
+ */
+ public final static Integer OPTION_GROUPTYPE = 1;
+
/**
*
* @param plan
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Tue Dec
1 21:03:37 2009
@@ -49,12 +49,13 @@
* Enum for the type of join
*/
public static enum JOINTYPE {
- REGULAR, // Regular join
+ HASH, // Hash Join
REPLICATED, // Fragment Replicated join
SKEWED, // Skewed Join
MERGE // Sort Merge Join
};
+
/**
* LOJoin contains a list of logical operators corresponding to the
* relational operators and a list of generates for each relational
@@ -66,7 +67,12 @@
private boolean[] mInnerFlags;
private JOINTYPE mJoinType; // Retains the type of the join
- /**
+ /**
+ * static constant to refer to the option of selecting a join type
+ */
+ public final static Integer OPTION_JOIN = 1;
+
+ /**
*
* @param plan
* LogicalPlan this operator is a part of.
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
Tue Dec 1 21:03:37 2009
@@ -19,6 +19,7 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.io.IOException;
@@ -83,6 +84,13 @@
*/
protected boolean mIsProjectionMapComputed = false;
+ /**
+ * A HashSet to indicate whether an option (such a Join Type) was pinned
+ * by the user or can be chosen at runtime by the optimizer.
+ */
+ protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();
+
+
private static Log log = LogFactory.getLog(LogicalOperator.class);
/**
@@ -218,6 +226,14 @@
mRequestedParallelism = newRequestedParallelism;
}
+ public void pinOption(Integer opt) {
+ mPinnedOptions.add(opt);
+ }
+
+ public boolean isPinnedOption(Integer opt) {
+ return mPinnedOptions.contains(opt);
+ }
+
@Override
public String toString() {
StringBuffer msg = new StringBuffer();
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Tue Dec 1 21:03:37 2009
@@ -1476,10 +1476,10 @@
{
(
input = NestedExpr(lp) {log.debug("Limit input: " + input);}
- (
- t = <INTEGER> { l = Long.parseLong(t.image); }
+ (
+ t = <INTEGER> { l = Long.parseLong(t.image); }
| t = <LONGINTEGER> { l = Long.parseLong(t.image.substring(0,
t.image.length() - 1)); }
- )
+ )
)
{
LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope,
getNextId()), l);
@@ -1748,6 +1748,11 @@
throw new ParseException("Collected group is only
supported for columns or star projection");
}
cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
+ cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+ }
+ |"\"regular\"" {
+ cogroup = parseCogroup(gis, lp,
LOCogroup.GROUPTYPE.REGULAR);
+ cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
}
)
]
@@ -2044,9 +2049,7 @@
ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
log.trace("Entering JoinClause");
log.debug("LogicalPlan: " + lp);
- LogicalOperator frj = null;
- LogicalOperator skj = null;
- LogicalOperator smj = null;
+ LogicalOperator joinOp = null;
boolean isLeftOuter = false;
boolean isRightOuter = false;
boolean isFullOuter = false;
@@ -2104,39 +2107,45 @@
if(isFullOuter || isRightOuter) {
throw new ParseException("Replicated join does not
support (right|full) outer joins");
}
- frj = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
+ joinOp = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
}
- | "\"repl\"" {
- if(isFullOuter || isRightOuter) {
+ | "\"repl\"" {
+ if(isFullOuter || isRightOuter) {
throw new ParseException("Replicated join does not
support (right|full) outer joins");
}
- frj=parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
- }
+ joinOp = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
+ }
|"\"skewed\"" {
- skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
+ joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
}
|"\"merge\"" {
if(isOuter) {
throw new ParseException("Merge join does not support
(left|right|full) outer joins");
}
- smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
+ joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
+ }
+ |"\"hash\"" {
+ joinOp = parseJoin(gis, lp,
LOJoin.JOINTYPE.HASH);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
+ }
+ |"\"default\"" {
+ joinOp = parseJoin(gis, lp,
LOJoin.JOINTYPE.HASH);
+ joinOp.pinOption(LOJoin.OPTION_JOIN);
})
]
)
)
{log.trace("Exiting JoinClause");
- if (frj!=null) {
- return frj;
- }
- else if (skj!=null) {
- return skj;
+ if (joinOp!=null) {
+ return joinOp;
}
- else if (smj!=null) {
- return smj;
- }
else {
- return parseJoin(gis, lp, LOJoin.JOINTYPE.REGULAR);
+ return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
}}
}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java?rev=885929&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java Tue Dec 1
21:03:37 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.apache.pig.ExecType.LOCAL;
+
+import junit.framework.TestCase;
+
+public class TestPinOptions extends TestCase {
+
+ protected PigServer pigServer;
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ pigServer = new PigServer(LOCAL);
+ }
+
+ @Test
+ public void testPinnedJoinOption() throws IOException {
+ String[] joinTypes = {"hash", "repl", "merge", "skewed", "default"};
+ String[] expectedJoinTypes = {"HASH", "REPLICATED", "MERGE", "SKEWED",
"HASH"};
+ for (int i = 0; i < joinTypes.length; i++) {
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("b = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("c = join a by foo, b by foo using
\""+joinTypes[i]+"\";");
+ LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"),
"c");
+ assertTrue("did "+joinTypes[i]+" join get pinned? ",
+ op.isPinnedOption(LOJoin.OPTION_JOIN));
+ assertEquals("did the right join type get set? ",
+ ((LOJoin) op).getJoinType().toString(),
expectedJoinTypes[i]);
+ pigServer.discardBatch();
+ }
+ }
+
+ @Test
+ public void testNotPinnedJinOption() throws IOException {
+ pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("b = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("c = join a by foo, b by foo;");
+ LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"),
"c");
+ assertEquals("default join should be hash",
+ ((LOJoin) op).getJoinType().toString(), "HASH");
+ assertFalse(op.isPinnedOption(LOJoin.OPTION_JOIN));
+ }
+
+ @Test
+ public void testGroupOptions() throws IOException {
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("b = group a by foo;");
+
+ LogicalOperator op = getOpByAlias(pigServer.getAliases().get("b"),
"b");
+ assertFalse(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE));
+ pigServer.discardBatch();
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+ pigServer.registerQuery("b = group a by foo using \"collected\";");
+ op = getOpByAlias(pigServer.getAliases().get("b"), "b");
+ assertTrue(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE));
+ pigServer.discardBatch();
+ }
+
+ private LogicalOperator getOpByAlias(LogicalPlan lp, String alias) {
+ for (LogicalOperator op : lp) {
+ if (op.getAlias().equals(alias)) return op;
+ }
+ return null;
+ }
+
+}