Author: daijy
Date: Mon Jun  7 04:58:30 2010
New Revision: 952098

URL: http://svn.apache.org/viewvc?rev=952098&view=rev
Log:
PIG-282: Custom Partitioner

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jun  7 04:58:30 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-282: Custom Partitioner (aniket486 via daijy)
+
 PIG-283: Allow to set arbitrary jobconf key-value pairs inside pig program 
(hashutosh)
 
 PIG-1373: We need to add jdiff output to docs on the website (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon Jun  7 04:58:30 2010
@@ -376,7 +376,7 @@ public class JobControlCompiler{
                 }
             }
 
-            //Create the jar of all functions reuired
+            //Create the jar of all functions and classes required
             File submitJarFile = File.createTempFile("Job", ".jar");
             // ensure the job jar is deleted on exit
             submitJarFile.deleteOnExit();
@@ -530,6 +530,8 @@ public class JobControlCompiler{
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
                 if (mro.requestedParallelism>0)
                     nwJob.setNumReduceTasks(mro.requestedParallelism);
+                if (mro.customPartitioner != null)
+                       
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
                 conf.set("pig.mapPlan", 
ObjectSerializer.serialize(mro.mapPlan));
                 if(mro.isEndOfAllInputSetInMap()) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Jun  7 04:58:30 2010
@@ -30,7 +30,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -986,6 +985,7 @@ public class MRCompiler extends PhyPlanV
     public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException{
         try{
             blocking(op);
+            curMROp.customPartitioner = op.getCustomPartitioner();
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
@@ -1673,7 +1673,8 @@ public class MRCompiler extends PhyPlanV
                                           
                        
                        // create POGlobalRearrange
-                       POGlobalRearrange gr = new POGlobalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)), rp);                          
+                       POGlobalRearrange gr = new POGlobalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+                       // Skewed join has its own special partitioner 
                        gr.setResultType(DataType.TUPLE);
                        gr.visit(this);
                        if(gr.getRequestedParallelism() > 
curMROp.requestedParallelism)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Mon Jun  7 04:58:30 2010
@@ -112,6 +112,9 @@ public class MapReduceOper extends Opera
     
     int requestedParallelism = -1;
     
+    /* Name of the Custom Partitioner used */ 
+    String customPartitioner = null;
+    
     // Last POLimit value in this map reduce operator, needed by LimitAdjuster
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
@@ -367,6 +370,10 @@ public class MapReduceOper extends Opera
     public int getRequestedParallelism() {
         return requestedParallelism;
     }
+    
+    public String getCustomPartitioner() {
+       return customPartitioner;
+    }
 
     public void setSplitter(boolean spl) {
         splitter = spl;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Mon Jun  7 04:58:30 2010
@@ -530,6 +530,7 @@ public class LogToPhyTranslationVisitor 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                 scope, nodeGen.getNextNodeId(scope)), cs
                 .getRequestedParallelism());
+        poGlobal.setCustomPartitioner(cs.getCustomPartitioner());
         poGlobal.setAlias(cs.getAlias());
         POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), cs.getRequestedParallelism());
@@ -667,9 +668,9 @@ public class LogToPhyTranslationVisitor 
 
         case REGULAR:
             POPackage poPackage = 
compileToLR_GR_PackTrio(cg.getOperatorKey().scope,
-                    cg.getInputs(),cg.getRequestedParallelism(),cg.getAlias(),
-                    cg.getInner(),cg.getGroupByPlans());
-
+                    cg.getInputs(), cg.getRequestedParallelism(), 
cg.getCustomPartitioner(),
+                    cg.getAlias(), cg.getInner(),cg.getGroupByPlans());
+            
             logToPhyMap.put(cg, poPackage);
             break;
             
@@ -769,12 +770,13 @@ public class LogToPhyTranslationVisitor 
     }
     
     private POPackage compileToLR_GR_PackTrio(String 
scope,List<LogicalOperator> inputs,
-            int parallel, String alias, boolean[] innerFlags, 
MultiMap<LogicalOperator, 
+            int parallel, String customPartitioner, String alias, boolean[] 
innerFlags, MultiMap<LogicalOperator, 
             LogicalPlan> innerPlans) throws VisitorException {
 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                 scope, nodeGen.getNextNodeId(scope)), parallel);
         poGlobal.setAlias(alias);
+        poGlobal.setCustomPartitioner(customPartitioner);
         POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), parallel);
         poPackage.setAlias(alias);
@@ -1118,8 +1120,7 @@ public class LogToPhyTranslationVisitor 
             return;
         }
                else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
-                   
-                   POPackage poPackage = compileToLR_GR_PackTrio(scope, 
inputs, parallel, alias, innerFlags, loj.getJoinPlans());
+                   POPackage poPackage = compileToLR_GR_PackTrio(scope, 
inputs, parallel, loj.getCustomPartitioner(), alias, innerFlags, 
loj.getJoinPlans());
                POForEach fe = compileFE4Flattening(innerFlags,  scope, 
parallel, alias, inputs);
             currentPlan.add(fe);
             try {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
 Mon Jun  7 04:58:30 2010
@@ -43,6 +43,19 @@ public class POGlobalRearrange extends P
      * 
      */
     private static final long serialVersionUID = 1L;
+    
+    /* As, GlobalRearrange decides the map reduce boundary, we add custom
+     * partitioner here
+     */
+    protected String customPartitioner;
+
+    public String getCustomPartitioner() {
+               return customPartitioner;
+       }
+
+       public void setCustomPartitioner(String customPartitioner) {
+               this.customPartitioner = customPartitioner;
+       }
 
     public POGlobalRearrange(OperatorKey k) {
         this(k, -1, null);
@@ -51,7 +64,7 @@ public class POGlobalRearrange extends P
     public POGlobalRearrange(OperatorKey k, int rp) {
         this(k, rp, null);
     }
-
+    
     public POGlobalRearrange(OperatorKey k, List inp) {
         this(k, -1, null);
     }
@@ -59,7 +72,7 @@ public class POGlobalRearrange extends P
     public POGlobalRearrange(OperatorKey k, int rp, List inp) {
         super(k, rp, inp);
     }
-
+    
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitGlobalRearrange(this);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
Mon Jun  7 04:58:30 2010
@@ -89,9 +89,22 @@ abstract public class LogicalOperator ex
      * by the user or can be chosen at runtime by the optimizer.
      */
     protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();
+    
+    /**
+     * Name of the customPartitioner if one is used, this is set to null 
otherwise.
+     */
+    protected String mCustomPartitioner = null;
 
     
-    private static Log log = LogFactory.getLog(LogicalOperator.class);
+    public String getCustomPartitioner() {
+               return mCustomPartitioner;
+       }
+
+       public void setCustomPartitioner(String customPartitioner) {
+               this.mCustomPartitioner = customPartitioner;
+       }
+
+       private static Log log = LogFactory.getLog(LogicalOperator.class);
 
     /**
      * Equivalent to LogicalOperator(k, 0).

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon Jun  7 04:58:30 2010
@@ -33,6 +33,7 @@ package org.apache.pig.impl.logicalLayer
 import java.io.*;
 import java.util.*;
 import java.net.URI;
+import java.lang.Class;
 import java.net.URISyntaxException;
 import java.lang.reflect.Type;
 import org.apache.pig.impl.logicalLayer.*;
@@ -71,7 +72,9 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.impl.util.LinkedMultiMap;
 
 public class QueryParser {
@@ -900,6 +903,23 @@ class FunctionType {
             throw new Exception("Received an unknown function type: " + 
funcType);
         }
     }
+};
+
+class ClassType {
+       public static final byte UNKNOWNCLASS = 0;
+       public static final byte PARTITIONER = 2;
+       
+       public static void checkClassType(Class cs, byte classType) throws 
Exception {
+               switch(classType) {
+               case ClassType.PARTITIONER:
+                       if(!(cs.newInstance() instanceof Partitioner)) {
+                               throw new Exception("Not a class of 
org.apache.hadoop.mapreduce.Partitioner");
+                       }
+                       break;
+               default:
+                       throw new Exception("Received an unknown class type: " 
+ classType);
+               }
+       }
 }
 
 PARSER_END(QueryParser)
@@ -942,6 +962,7 @@ TOKEN : { <INNER : "inner"> }
 TOKEN : { <OUTER : "outer"> }
 TOKEN : { <STAR : "*">                 }
 TOKEN : { <PARALLEL : "parallel"> }
+TOKEN : { <PARTITION : "partition by"> }
 TOKEN : { <GROUP : "group"> }
 TOKEN : { <AND : "and"> }
 TOKEN : { <OR : "or"> }
@@ -1124,7 +1145,6 @@ LogicalOperator SplitClause(LogicalPlan 
        {log.trace("Exiting SplitClause"); return splitOp;}
 } 
 
-
 LogicalOperator Expr(LogicalPlan lp) : 
 {
        LogicalOperator op; 
@@ -1164,7 +1184,7 @@ Token IdentifierOrReserved() :
 }
 {
   (
-  ( t1 = <DEFINE> )
+ ( t1 = <DEFINE> )
 | (t1 = <LOAD> )
 | (t1 =<FILTER> )
 | (t1 =<FOREACH> )
@@ -1187,6 +1207,7 @@ Token IdentifierOrReserved() :
 | (t1 =<INNER> )
 | (t1 =<OUTER> )
 | (t1 =<PARALLEL> )
+| (t1 =<PARTITION>)
 | (t1 =<GROUP> )
 | (t1 =<AND> )
 | (t1 =<OR> )
@@ -1260,6 +1281,7 @@ LogicalOperator BaseExpr(LogicalPlan lp)
        Token t1, t2; 
        Schema.FieldSchema fs; 
        log.trace("Entering BaseExpr");
+       String partitioner = null;
 }
 {
        (
@@ -1289,9 +1311,11 @@ LogicalOperator BaseExpr(LogicalPlan lp)
 |   (<SAMPLE> op = SampleClause(lp))
 |   (<ORDER> op = OrderClause(lp))
 |      (<DISTINCT> op = NestedExpr(lp) 
+       ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
        {
                LogicalOperator distinct = new LODistinct(lp, new 
OperatorKey(scope, getNextId())); 
                lp.add(distinct);
+               distinct.setCustomPartitioner(partitioner);
                log.debug("Added operator: " + distinct.getClass().getName() + 
" to the logical plan"); 
                lp.connect(op, distinct);
                log.debug("Connected alias: " + op.getAlias() + " operator " + 
op.getClass().getName() + " to operator " + distinct.getClass().getName());
@@ -1798,6 +1822,7 @@ LogicalOperator CogroupClause(LogicalPla
     LogicalOperator cogroup = null; 
     log.trace("Entering CoGroupClause");
     Token t;
+    String partitioner = null;
 }
 {
     (gi = GroupItem(lp) { gis.add(gi); }
@@ -1817,15 +1842,18 @@ LogicalOperator CogroupClause(LogicalPla
             cogroup = parseUsingForGroupBy("merge", gis, lp);
             }
         )])
+        ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
     )
-
     {
         if (cogroup != null) {
+               cogroup.setCustomPartitioner(partitioner);
             log.trace("Exiting CoGroupClause");
             return cogroup;
         }
-
         cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+        if(cogroup != null) {
+               cogroup.setCustomPartitioner(partitioner);
+        }
         log.trace("Exiting CoGroupClause");
         return cogroup;                
     }
@@ -2079,7 +2107,8 @@ int ColNameOrNum(Schema over) : 
 LogicalOperator CrossClause(LogicalPlan lp) : 
 {
        LogicalOperator op; 
-       ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
+       ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+       String partitioner = null; 
        log.trace("Entering CrossClause");
 }
 {
@@ -2087,9 +2116,11 @@ LogicalOperator CrossClause(LogicalPlan 
        op = NestedExpr(lp) { inputs.add(op); }
        ("," op = NestedExpr(lp) { inputs.add(op); })+
        )
+       ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
        {
                LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, 
getNextId()));
                lp.add(cross);
+               cross.setCustomPartitioner(partitioner);
                log.debug("Added operator " + cross.getClass().getName() + " to 
the logical plan");
                
                for (LogicalOperator lop: inputs) {
@@ -2115,6 +2146,7 @@ LogicalOperator JoinClause(LogicalPlan l
        boolean isFullOuter = false;
        boolean isOuter = false;
        Token t;
+       String partitioner = null;
 }
 {
        (gi = JoinItem(lp) { gis.add(gi); }
@@ -2126,7 +2158,6 @@ LogicalOperator JoinClause(LogicalPlan l
         (<FULL> [<OUTER>] {isFullOuter = true;})
        ]
        ("," gi = JoinItem(lp) { gis.add(gi); })+
-       
        {
                // in the case of outer joins, only two
                // inputs are allowed
@@ -2182,15 +2213,23 @@ LogicalOperator JoinClause(LogicalPlan l
             joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, 
isRightOuter, isOuter);
                        }
      )]))
-
-       {log.trace("Exiting JoinClause");
-       if (joinOp!=null) {
+     ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
+       
+       {
+               log.trace("Exiting JoinClause");
+               if (joinOp == null) {
+                       joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
+               }
+               if(partitioner != null) {
+                       if(((LOJoin)joinOp).getJoinType() == 
LOJoin.JOINTYPE.SKEWED) {
+                               throw new ParseException("Custom Partitioner is 
not supported for skewed join");
+                       }
+                       else {
+                               joinOp.setCustomPartitioner(partitioner);
+                       }
+               }
                return joinOp;
        }
-       else {
-               return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-       }}
-       
 }
 
 LogicalOperator UnionClause(LogicalPlan lp) : 
@@ -3703,6 +3742,35 @@ Object  EvalFunction(byte funcType) : 
        }
 }
 
+String EvalClass(byte classType) :
+{
+       String className;
+       Class cs;
+       log.trace("Entering EvalClass");
+}
+{
+       className = QualifiedFunction()
+       {
+               cs = PigContext.resolveClassName(className);
+               try {
+                       ClassType.checkClassType(cs, classType);
+               }
+               catch (ExecException e) {
+                       ParseException pe = new ParseException("Class " + 
className + " not found");
+                       pe.initCause(e);
+                       throw pe;
+               }
+               catch (Exception e){
+                       ParseException pe = new ParseException(e.getMessage());
+                       pe.initCause(e);
+                       throw pe;
+               }
+               log.trace("Exiting EvalClass");
+               
+               return className;
+       }
+}
+
 /**
  * Bug 831620 - '$' support
  */

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Jun  7 
04:58:30 2010
@@ -17,11 +17,15 @@
  */
 package org.apache.pig.test;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -36,6 +40,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -489,5 +494,154 @@ public class TestEvalPipeline2 extends T
         
         Util.deleteFile(cluster, "table_testNestedDescSort");
     }
-
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerParseJoins() throws Exception{
+       String[] input = {
+                "1\t3",
+                "1\t2"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", 
input);
+        
+        pigServer.registerQuery("A = LOAD 
'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+        
+        pigServer.registerQuery("B = ORDER A by $0;");
+        
+        // Custom Partitioner is not allowed for skewed joins, will throw a 
ExecException 
+        try {
+               pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 
'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+               //control should not reach here
+               fail("Skewed join cannot accept a custom partitioner");
+        }
+        catch (FrontendException e) {
+               assertTrue(e.getErrorCode() == 1000);
+               }
+        
+        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' 
PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("hash");
+        Tuple t;
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        // No checks are made for merged and replicated joins as they are 
compiled to a map only job 
+        // No frontend error checking has been added for these jobs, hence not 
adding any test cases 
+        // Manually tested the sanity once. Above test should cover the basic 
sanity of the scenario 
+        
+        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+    }
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerGroups() throws Exception{
+       String[] input = {
+                "1\t1",
+                "2\t1",
+                "3\t1",
+                "4\t1"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", 
input);
+        
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' 
as (a0:int, a1:int);");
+        
+        // It should be noted that for a map reduce job, the total number of 
partitions 
+        // is the same as the number of reduce tasks for the job. Hence we 
need to find a case wherein 
+        // we will get more than one reduce job so that we can use the 
partitioner.    
+        // The following logic assumes that we get 2 reduce jobs, so that we 
can hard-code the logic.
+        //
+        pigServer.registerQuery("B = group A by $0 PARTITION BY 
org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+        
+        pigServer.store("B", "tmp_testCustomPartitionerGroups");
+        
+        new File("tmp_testCustomPartitionerGroups").mkdir();
+        
+        // SimpleCustomPartitioner partitions as per the parity of the key
+        // Need to change this in SimpleCustomPartitioner is changed
+        Util.copyFromClusterToLocal(cluster, 
"tmp_testCustomPartitionerGroups/part-r-00000", 
"tmp_testCustomPartitionerGroups/part-r-00000");
+        BufferedReader reader = new BufferedReader(new 
FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
+           String line = null;              
+           while((line = reader.readLine()) != null) {
+               String[] cols = line.split("\t");
+               int value = Integer.parseInt(cols[0]) % 2;
+               assertEquals(0, value);
+           }
+           Util.copyFromClusterToLocal(cluster, 
"tmp_testCustomPartitionerGroups/part-r-00001", 
"tmp_testCustomPartitionerGroups/part-r-00001");
+        reader = new BufferedReader(new 
FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
+           line = null;             
+           while((line = reader.readLine()) != null) {
+               String[] cols = line.split("\t");
+               int value = Integer.parseInt(cols[0]) % 2;
+               assertEquals(1, value);
+           } 
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+    }
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerCross() throws Exception{
+       String[] input = {
+                "1\t3",
+                "1\t2",
+        };
+       
+        Util.createInputFile(cluster, "table_testCustomPartitionerCross", 
input);
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' 
as (a0:int, a1:int);");
+        pigServer.registerQuery("B = ORDER A by $0;");
+        pigServer.registerQuery("C = cross A , B PARTITION BY 
org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Tuple t;
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+    }
 }

Added: 
hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java?rev=952098&view=auto
==============================================================================
--- 
hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java 
(added)
+++ 
hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java 
Mon Jun  7 04:58:30 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner extends Partitioner<PigNullableWritable, 
Writable> {
+
+       @Override
+       public int getPartition(PigNullableWritable key, Writable value, int 
numPartitions) {
+               if(key.getValueAsPigType() instanceof Integer) {
+                       int ret = 
(((Integer)key.getValueAsPigType()).intValue() % numPartitions);
+                       return ret;
+               }
+               else {
+                       return (key.hashCode()) % numPartitions;
+               }
+       }
+}


Reply via email to