Author: olga
Date: Fri Sep 18 04:55:49 2009
New Revision: 816470

URL: http://svn.apache.org/viewvc?rev=816470&view=rev
Log:
PIG-964: Handling null in skewed join (sriranjan via olgan)

Modified:
    hadoop/pig/branches/branch-0.5/CHANGES.txt
    hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java
    
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
    hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/branches/branch-0.5/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/CHANGES.txt?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.5/CHANGES.txt Fri Sep 18 04:55:49 2009
@@ -85,6 +85,8 @@
 PIG-792: skew join implementation (sriranjan via olgan)
 
 BUG FIXES
+    PIG-964: Handling null in skewed join (sriranjan via olgan)
+
     PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)
 
     PIG-957: Tutorial is broken with 0.4 branch and trunk (pradeepkth)

Modified: hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java Fri Sep 
18 04:55:49 2009
@@ -59,5 +59,6 @@
     UNABLE_TO_CLOSE_SPILL_FILE,
     UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
     USING_OVERLOADED_FUNCTION,
+    REDUCER_COUNT_LOW,
     NULL_COUNTER_COUNT;
 }

Modified: 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Fri Sep 18 04:55:49 2009
@@ -77,20 +77,20 @@
                // for partition table, compute the index based on the sampler 
output
                Pair <Integer, Integer> indexes;
                Integer curIndex = -1;
-               Tuple keyTuple = null;
+               Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
                
                // extract the key from nullablepartitionwritable
                PigNullableWritable key = ((NullablePartitionWritable) 
wrappedKey).getKey();
 
-               if (key instanceof NullableTuple) {
+               try {
+                       keyTuple.set(0, key.getValueAsPigType());
+               } catch (ExecException e) {
+                       return -1;
+               }
+               
+               // if the key is not null and key 
+               if (key instanceof NullableTuple && key.getValueAsPigType() != 
null) {
                        keyTuple = (Tuple)key.getValueAsPigType();
-               } else {
-                       keyTuple = 
DefaultTupleFactory.getInstance().newTuple(1);
-                       try {
-                               keyTuple.set(0, key.getValueAsPigType());
-                       } catch (ExecException e) {
-                               return -1;
-                       }
                }
 
                indexes = reducerMap.get(keyTuple);

Modified: 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 (original)
+++ 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 Fri Sep 18 04:55:49 2009
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -198,8 +199,13 @@
                        }
 
                        if (maxReducers > totalReducers_) {
-                               throw new RuntimeException("You need at least " 
+ maxReducers
-                                               + " reducers to run this job.");
+                               if(pigLogger != null) {
+                    pigLogger.warn(this,"You need at least " + maxReducers
+                               + " reducers to avoid spillage and run this job 
efficiently.", PigWarning.REDUCER_COUNT_LOW);
+                } else {
+                               log.warn("You need at least " + maxReducers
+                                               + " reducers to avoid spillage 
and run this job efficiently.");
+                }
                        }
 
                        output.put(PARTITION_LIST, 
mBagFactory.newDefaultBag(reducerList));

Modified: 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
 Fri Sep 18 04:55:49 2009
@@ -152,7 +152,7 @@
         // we move to next boundry
         t = loader.getSampledTuple();        
         long finalPos = loader.getPosition();
-        
+
         long toSkip = skipInterval - (finalPos - initialPos);
         if (toSkip > 0) {
             long rc = loader.skip(toSkip);
@@ -187,7 +187,7 @@
         }
         
         // add size of the tuple at the end
-        m.set(t.size(), (finalPos-middlePos));
+        m.set(t.size(), (finalPos-middlePos) + 1); // offset 1 for null
         
         return m;              
        }

Modified: 
hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java 
(original)
+++ hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java 
Fri Sep 18 04:55:49 2009
@@ -41,6 +41,7 @@
     private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
     private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
     private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
+    private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -99,11 +100,26 @@
             
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
         }
        w4.close();
-
+       
+       // Create a file with null keys
+       PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5));
+        for(int i=0; i < 10; i++) {
+               w5.println("\tapple1");
+        }
+        w5.println("100\tapple2");
+        for(int i=0; i < 10; i++) {
+               w5.println("\torange1");
+        }
+        w5.println("\t");
+        w5.println("100\t");
+        w5.close();
+        
        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+       Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
+
     }
     
     @After
@@ -118,6 +134,8 @@
         Util.deleteFile(cluster, INPUT_FILE2);
         Util.deleteFile(cluster, INPUT_FILE3);
         Util.deleteFile(cluster, INPUT_FILE4);
+        Util.deleteFile(cluster, INPUT_FILE5);
+
     }
     
     
@@ -194,10 +212,9 @@
                 }
             }
         }catch(Exception e) {
-               return;
+               fail("Should not throw exception, should continue execution");
         }
         
-        fail("Should throw exception, not enough reducers");
     }
     
     public void testSkewedJoin3Way() throws IOException{
@@ -286,4 +303,25 @@
          }
     }
     
+    public void testSkewedJoinNullKeys() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id, B by id using 
\"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        } catch(Exception e) {
+                       System.out.println(e.getMessage());
+                       e.printStackTrace();
+               fail("Should support null keys in skewed join");
+        }
+        return;
+    }
+    
 }


Reply via email to