Author: gates
Date: Tue Dec  1 21:03:37 2009
New Revision: 885929

URL: http://svn.apache.org/viewvc?rev=885929&view=rev
Log:
PIG-990 Provide a way to pin LogicalOperator Options.

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Dec  1 21:03:37 2009
@@ -24,6 +24,8 @@
 
 IMPROVEMENTS
 
+PIG-990:  Provide a way to pin LogicalOperator Options (dvryaboy via gates)
+
 PIG-1103: refactoring of commit tests (olgan)
 
 PIG-1101: Allow arugment to limit to be long in addition to int (ashutoshc via

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Tue Dec  1 21:03:37 2009
@@ -969,7 +969,7 @@
             logToPhyMap.put(loj, smj);
             return;
         }
-               else if (loj.getJoinType() == LOJoin.JOINTYPE.REGULAR){
+               else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
                POGlobalRearrange poGlobal = new POGlobalRearrange(new 
OperatorKey(
                        scope, nodeGen.getNextNodeId(scope)), loj
                        .getRequestedParallelism());

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue 
Dec  1 21:03:37 2009
@@ -64,6 +64,11 @@
     private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans;
     private GROUPTYPE mGroupType;
 
+    /** 
+     * static constant to refer to the option of selecting a group type
+     */
+    public final static Integer OPTION_GROUPTYPE = 1;
+
     /**
      * 
      * @param plan

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Tue Dec  
1 21:03:37 2009
@@ -49,12 +49,13 @@
      * Enum for the type of join
      */
        public static enum JOINTYPE {
-        REGULAR, // Regular join
+        HASH,    // Hash Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE   // Sort Merge Join
     };
 
+    
     /**
      * LOJoin contains a list of logical operators corresponding to the
      * relational operators and a list of generates for each relational
@@ -66,7 +67,12 @@
     private boolean[] mInnerFlags;
        private JOINTYPE mJoinType; // Retains the type of the join
 
-    /**
+       /** 
+        * static constant to refer to the option of selecting a join type
+        */
+       public final static Integer OPTION_JOIN = 1;
+       
+       /**
      * 
      * @param plan
      *            LogicalPlan this operator is a part of.

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
Tue Dec  1 21:03:37 2009
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.io.IOException;
@@ -83,6 +84,13 @@
      */
     protected boolean mIsProjectionMapComputed = false;
     
+    /**
+     * A HashSet to indicate whether an option (such a Join Type) was pinned
+     * by the user or can be chosen at runtime by the optimizer.
+     */
+    protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();
+
+    
     private static Log log = LogFactory.getLog(LogicalOperator.class);
 
     /**
@@ -218,6 +226,14 @@
         mRequestedParallelism = newRequestedParallelism;
     }
 
+    public void pinOption(Integer opt) {
+        mPinnedOptions.add(opt);
+    }
+    
+    public boolean isPinnedOption(Integer opt) {
+        return mPinnedOptions.contains(opt);
+    }
+    
     @Override
     public String toString() {
         StringBuffer msg = new StringBuffer();

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=885929&r1=885928&r2=885929&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Tue Dec  1 21:03:37 2009
@@ -1476,10 +1476,10 @@
 {
         (
         input = NestedExpr(lp) {log.debug("Limit input: " + input);}
-                (
-                   t = <INTEGER>     { l = Long.parseLong(t.image); }
+                (
+                   t = <INTEGER>     { l = Long.parseLong(t.image); }
           | t = <LONGINTEGER> { l = Long.parseLong(t.image.substring(0, 
t.image.length() - 1)); }
-         )
+                )
         )
         {
                 LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, 
getNextId()), l);
@@ -1748,6 +1748,11 @@
                     throw new ParseException("Collected group is only 
supported for columns or star projection");
                 }
                 cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
+                cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+                }
+                |"\"regular\"" {
+                    cogroup = parseCogroup(gis, lp, 
LOCogroup.GROUPTYPE.REGULAR);
+                    cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
                 }
                 )
             ]                                                                  
      
@@ -2044,9 +2049,7 @@
        ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
        log.trace("Entering JoinClause");
        log.debug("LogicalPlan: " + lp);
-       LogicalOperator frj = null;
-       LogicalOperator skj = null;
-       LogicalOperator smj = null;
+       LogicalOperator joinOp = null;
        boolean isLeftOuter = false;
        boolean isRightOuter = false;
        boolean isFullOuter = false;
@@ -2104,39 +2107,45 @@
                  if(isFullOuter || isRightOuter) {
                      throw new ParseException("Replicated join does not 
support (right|full) outer joins");
                  }
-                                   frj = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED); 
+                                   joinOp = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED); 
+                                   joinOp.pinOption(LOJoin.OPTION_JOIN); 
                            }
-                       | "\"repl\"" { 
-                                   if(isFullOuter || isRightOuter) {
+                       | "\"repl\"" {
+                  if(isFullOuter || isRightOuter) {
                            throw new ParseException("Replicated join does not 
support (right|full) outer joins");
                  }
-                                   frj=parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED);
-                               }
+                                   joinOp = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED);
+                                   joinOp.pinOption(LOJoin.OPTION_JOIN);
+                  }
                    |"\"skewed\"" {
-                           skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); 
+                           joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
+                           joinOp.pinOption(LOJoin.OPTION_JOIN);
                        }
                    |"\"merge\"" { 
                            if(isOuter) {
                         throw new ParseException("Merge join does not support 
(left|right|full) outer joins");
                     }
-                           smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); 
+                           joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
+                           joinOp.pinOption(LOJoin.OPTION_JOIN); 
+                       }
+                   |"\"hash\"" {
+                               joinOp = parseJoin(gis, lp, 
LOJoin.JOINTYPE.HASH);
+                               joinOp.pinOption(LOJoin.OPTION_JOIN);
+                       }
+                   |"\"default\"" {
+                               joinOp = parseJoin(gis, lp, 
LOJoin.JOINTYPE.HASH);
+                               joinOp.pinOption(LOJoin.OPTION_JOIN);
                        })
            ] 
     )
     )
 
        {log.trace("Exiting JoinClause");
-       if (frj!=null) {
-               return frj;
-       }
-       else if (skj!=null) {
-               return skj;
+       if (joinOp!=null) {
+               return joinOp;
        }
-       else if (smj!=null) {
-        return smj;
-    }
        else {
-               return parseJoin(gis, lp, LOJoin.JOINTYPE.REGULAR);
+               return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
        }}
        
 }

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java?rev=885929&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPinOptions.java Tue Dec  1 
21:03:37 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.apache.pig.ExecType.LOCAL;
+
+import junit.framework.TestCase;
+
+public class TestPinOptions extends TestCase {
+
+    protected PigServer pigServer;
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        pigServer = new PigServer(LOCAL);
+    }
+        
+    @Test
+    public void testPinnedJoinOption() throws IOException {
+        String[] joinTypes = {"hash", "repl", "merge", "skewed", "default"};
+        String[] expectedJoinTypes = {"HASH", "REPLICATED", "MERGE", "SKEWED", 
"HASH"};
+        for (int i = 0; i < joinTypes.length; i++) {
+            pigServer.setBatchOn();
+            pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+            pigServer.registerQuery("b = load '/tmp' as (foo, bar);");
+            pigServer.registerQuery("c = join a by foo, b by foo using 
\""+joinTypes[i]+"\";");
+            LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"), 
"c");
+            assertTrue("did "+joinTypes[i]+" join get pinned? ", 
+                    op.isPinnedOption(LOJoin.OPTION_JOIN));
+            assertEquals("did the right join type get set? ",
+                    ((LOJoin) op).getJoinType().toString(), 
expectedJoinTypes[i]);
+            pigServer.discardBatch();
+        }
+    }
+    
+    @Test
+    public void testNotPinnedJinOption() throws IOException {
+        pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+        pigServer.registerQuery("b = load '/tmp' as (foo, bar);");
+        pigServer.registerQuery("c = join a by foo, b by foo;");
+        LogicalOperator op = getOpByAlias(pigServer.getAliases().get("c"), 
"c");
+        assertEquals("default join should be hash", 
+                ((LOJoin) op).getJoinType().toString(), "HASH");
+        assertFalse(op.isPinnedOption(LOJoin.OPTION_JOIN));
+    }
+    
+    @Test
+    public void testGroupOptions() throws IOException {
+        pigServer.setBatchOn();
+        pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+        pigServer.registerQuery("b = group a by foo;");
+        
+        LogicalOperator op = getOpByAlias(pigServer.getAliases().get("b"), 
"b");
+        assertFalse(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE));
+        pigServer.discardBatch();
+        
+        pigServer.setBatchOn();
+        pigServer.registerQuery("a = load '/tmp' as (foo, bar);");
+        pigServer.registerQuery("b = group a by foo using \"collected\";");
+        op = getOpByAlias(pigServer.getAliases().get("b"), "b");
+        assertTrue(op.isPinnedOption(LOCogroup.OPTION_GROUPTYPE));
+        pigServer.discardBatch();
+    }
+    
+    private LogicalOperator getOpByAlias(LogicalPlan lp, String alias) {
+        for (LogicalOperator op : lp) {
+            if (op.getAlias().equals(alias)) return op;
+        }
+        return null;
+    }
+    
+}


Reply via email to