Author: olga
Date: Sun Sep 20 01:10:57 2009
New Revision: 816978

URL: http://svn.apache.org/viewvc?rev=816978&view=rev
Log:
PIG-964: Handling null in skewed join (sriranjan via olgan)

Modified:
    hadoop/pig/branches/branch-0.4/CHANGES.txt
    hadoop/pig/branches/branch-0.4/build.xml
    hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java
    
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java
    
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java
    hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.4/CHANGES.txt Sun Sep 20 01:10:57 2009
@@ -73,6 +73,10 @@
 
 BUG FIXES
 
+    PIG-964: Handling null in skewed join (sriranjan via olgan)
+
+    PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)
+
     PIG-963: Join in local mode matches null keys (pradeepkth)
 
     PIG-957: Tutorial is broken with 0.4 branch and trunk (pradeepkth)

Modified: hadoop/pig/branches/branch-0.4/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/build.xml?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/build.xml (original)
+++ hadoop/pig/branches/branch-0.4/build.xml Sun Sep 20 01:10:57 2009
@@ -493,7 +493,7 @@
         <mkdir dir="${dist.dir}/license" />
 
         <copy todir="${dist.dir}/lib" includeEmptyDirs="false">
-            <fileset dir="${ivy.lib.dir}"/>
+            <!--fileset dir="${ivy.lib.dir}"/-->
             <fileset dir="${lib.dir}"/>
         </copy>
 

Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java Sun Sep 
20 01:10:57 2009
@@ -59,5 +59,6 @@
     UNABLE_TO_CLOSE_SPILL_FILE,
     UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
     USING_OVERLOADED_FUNCTION,
+    REDUCER_COUNT_LOW,
     NULL_COUNTER_COUNT;
 }

Modified: 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Sun Sep 20 01:10:57 2009
@@ -81,7 +81,7 @@
         POLoad load = (POLoad)po;
         String loadFunc = load.getLFile().getFuncName();
         String loadFile = load.getLFile().getFileName();
-        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && 
!("org.apache.pig.impl.builtin.SkewedJoinSampleLoader".equals(loadFunc))) {
+        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && 
!("org.apache.pig.impl.builtin.PoissonSampleLoader".equals(loadFunc))) {
             log.debug("Not a sampling job.");
             return;
         }

Modified: 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Sun Sep 20 01:10:57 2009
@@ -77,20 +77,20 @@
                // for partition table, compute the index based on the sampler 
output
                Pair <Integer, Integer> indexes;
                Integer curIndex = -1;
-               Tuple keyTuple = null;
+               Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
                
                // extract the key from nullablepartitionwritable
                PigNullableWritable key = ((NullablePartitionWritable) 
wrappedKey).getKey();
 
-               if (key instanceof NullableTuple) {
+               try {
+                       keyTuple.set(0, key.getValueAsPigType());
+               } catch (ExecException e) {
+                       return -1;
+               }
+               
+               // if the key is not null and key 
+               if (key instanceof NullableTuple && key.getValueAsPigType() != 
null) {
                        keyTuple = (Tuple)key.getValueAsPigType();
-               } else {
-                       keyTuple = 
DefaultTupleFactory.getInstance().newTuple(1);
-                       try {
-                               keyTuple.set(0, key.getValueAsPigType());
-                       } catch (ExecException e) {
-                               return -1;
-                       }
                }
 
                indexes = reducerMap.get(keyTuple);

Modified: 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 Sun Sep 20 01:10:57 2009
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -198,8 +199,13 @@
                        }
 
                        if (maxReducers > totalReducers_) {
-                               throw new RuntimeException("You need at least " 
+ maxReducers
-                                               + " reducers to run this job.");
+                               if(pigLogger != null) {
+                    pigLogger.warn(this,"You need at least " + maxReducers
+                               + " reducers to avoid spillage and run this job 
efficiently.", PigWarning.REDUCER_COUNT_LOW);
+                } else {
+                               log.warn("You need at least " + maxReducers
+                                               + " reducers to avoid spillage 
and run this job efficiently.");
+                }
                        }
 
                        output.put(PARTITION_LIST, 
mBagFactory.newDefaultBag(reducerList));

Modified: 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java
 Sun Sep 20 01:10:57 2009
@@ -152,7 +152,7 @@
         // we move to next boundry
         t = loader.getSampledTuple();        
         long finalPos = loader.getPosition();
-        
+
         long toSkip = skipInterval - (finalPos - initialPos);
         if (toSkip > 0) {
             long rc = loader.skip(toSkip);
@@ -187,7 +187,7 @@
         }
         
         // add size of the tuple at the end
-        m.set(t.size(), (finalPos-middlePos));
+        m.set(t.size(), (finalPos-middlePos) + 1); // offset 1 for null
         
         return m;              
        }

Modified: 
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java
 Sun Sep 20 01:10:57 2009
@@ -180,4 +180,36 @@
         }
         tmpFile.delete();
     }
+    
+    @Test
+    public void testPoissonSampleOptimizer() throws Exception {
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+        planTester.buildPlan("B = load 'input' using PigStorage('\t');");
+        planTester.buildPlan(" C = join A by $0, B by $0 using \"skewed\";");
+        LogicalPlan lp = planTester.buildPlan("store C into 'output';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        int count = 1;
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // Before optimizer visits, number of MR jobs = 3.
+        assertEquals(3,count);
+
+        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        so.visit();
+
+        count = 1;
+        mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // After optimizer visits, number of MR jobs = 2
+        assertEquals(2,count);
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=816978&r1=816977&r2=816978&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java 
(original)
+++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java 
Sun Sep 20 01:10:57 2009
@@ -41,6 +41,7 @@
     private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
     private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
     private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
+    private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -99,11 +100,26 @@
             
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
         }
        w4.close();
-
+       
+       // Create a file with null keys
+       PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5));
+        for(int i=0; i < 10; i++) {
+               w5.println("\tapple1");
+        }
+        w5.println("100\tapple2");
+        for(int i=0; i < 10; i++) {
+               w5.println("\torange1");
+        }
+        w5.println("\t");
+        w5.println("100\t");
+        w5.close();
+        
        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+       Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
+
     }
     
     @After
@@ -118,6 +134,8 @@
         Util.deleteFile(cluster, INPUT_FILE2);
         Util.deleteFile(cluster, INPUT_FILE3);
         Util.deleteFile(cluster, INPUT_FILE4);
+        Util.deleteFile(cluster, INPUT_FILE5);
+
     }
     
     
@@ -194,10 +212,9 @@
                 }
             }
         }catch(Exception e) {
-               return;
+               fail("Should not throw exception, should continue execution");
         }
         
-        fail("Should throw exception, not enough reducers");
     }
     
     public void testSkewedJoin3Way() throws IOException{
@@ -286,4 +303,25 @@
          }
     }
     
+    public void testSkewedJoinNullKeys() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id, B by id using 
\"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        } catch(Exception e) {
+                       System.out.println(e.getMessage());
+                       e.printStackTrace();
+               fail("Should support null keys in skewed join");
+        }
+        return;
+    }
+    
 }


Reply via email to