Author: pradeepkth
Date: Wed Sep  2 19:30:42 2009
New Revision: 810677

URL: http://svn.apache.org/viewvc?rev=810677&view=rev
Log:
PIG-935: Skewed join throws an exception when used with map keys(sriranjan via 
pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep  2 19:30:42 2009
@@ -60,6 +60,9 @@
 
 BUG FIXES
 
+    PIG-935: Skewed join throws an exception when used with map keys(sriranjan
+    via pradeepkth)
+
     PIG-934: Merge join implementation currently does not seek to right point
     on the right side input based on the offset provided by the index
     (ashutoshc via pradeepkth)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Wed Sep  2 19:30:42 2009
@@ -1489,9 +1489,16 @@
             Pair[] ret = new Pair[plans.size()]; 
             int i=-1;
             for (PhysicalPlan plan : plans) {
-                if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
-                int first = ((POProject)plan.getLeaves().get(0)).getColumn();
-                byte second = 
((POProject)plan.getLeaves().get(0)).getResultType();
+                PhysicalOperator op = plan.getLeaves().get(0);
+                int first = -1;
+                if (op instanceof POProject) {
+                    if (((POProject)op).isStar()) return null;
+                    first = ((POProject)op).getColumn();
+                } else {
+                    // the plan is not POProject, so we don't know the column 
index
+                    first = -1;
+                }
+                byte second = plan.getLeaves().get(0).getResultType();
                 ret[++i] = new Pair<Integer,Byte>(first,second);
             }
             return ret;
@@ -1705,8 +1712,6 @@
        
        List<PhysicalOperator> l = plan.getPredecessors(op);
        List<PhysicalPlan> groups = (List<PhysicalPlan>)joinPlans.get(l.get(0));
-       
-       
        List<Boolean> ascCol = new ArrayList<Boolean>();
        for(int i=0; i<groups.size(); i++) {                                    
                ascCol.add(false);
@@ -1734,7 +1739,7 @@
        ep.add(uf);     
        ep.add(prjStar);
        ep.connect(prjStar, uf);
-               
+
         transformPlans.add(ep);      
         
        try{                    
@@ -1742,7 +1747,7 @@
                String per = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", "0.5");
                String mc = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
                String inputFile = lFile.getFileName();
-                               
+
                return getSamplingJob(sort, prevJob, transformPlans, lFile, 
sampleFile, rp, null, 
                                                        
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile});
        }catch(Exception e) {
@@ -1818,6 +1823,14 @@
                 for (Pair<Integer,Byte> i : fields) {
                     PhysicalPlan ep = new PhysicalPlan();
                     POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+                    // Check for i being equal to -1. -1 is used by 
getSortCols for a non POProject
+                    // operator. Since Order by does not allow expression 
operators, it should never be set to
+                    // -1
+                    if (i.first == -1) {
+                       int errCode = 2174;
+                       String msg = "Internal exception. Could not create a 
sampler job";
+                        throw new MRCompilerException(msg, errCode, 
PigException.BUG);
+                    }
                     prj.setColumn(i.first);
                     prj.setOverloaded(false);
                     prj.setResultType(i.second);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Wed Sep  2 
19:30:42 2009
@@ -40,6 +40,7 @@
     private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
     private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
     private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
+    private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
     
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -48,7 +49,7 @@
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         // pigServer = new PigServer(ExecType.LOCAL);
         
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
 "5");     
-        
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
 "0.1");
+        
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
 "0.01");
     }
     
     @Before
@@ -93,9 +94,16 @@
 
        w3.close();
        
+       PrintWriter w4 = new PrintWriter(new FileWriter(INPUT_FILE4));
+        for(int i=0; i < 100; i++) {
+            
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
+        }
+       w4.close();
+
        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+       Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
     }
     
     @After
@@ -103,10 +111,12 @@
        new File(INPUT_FILE1).delete();
        new File(INPUT_FILE2).delete();
        new File(INPUT_FILE3).delete();
+        new File(INPUT_FILE4).delete();
        
         Util.deleteFile(cluster, INPUT_FILE1);
         Util.deleteFile(cluster, INPUT_FILE2);
         Util.deleteFile(cluster, INPUT_FILE3);
+        Util.deleteFile(cluster, INPUT_FILE4);
     }
     
     
@@ -177,4 +187,26 @@
         
         fail("Should throw exception, do not support 3 way join");
     }       
+
+    public void testSkewedJoinMapKey() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE4 + "' as (m:[]);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE4 + "' as (n:[]);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by (chararray)m#'a100', B 
by (chararray)n#'a100' using \"skewed\" parallel 20;");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        }catch(Exception e) {
+                       System.out.println(e.getMessage());
+                       e.printStackTrace();
+               fail("Should support maps and expression operators as keys");
+        }
+        
+               return;
+       }
 }


Reply via email to