Author: olga
Date: Fri Sep 11 23:20:52 2009
New Revision: 814075

URL: http://svn.apache.org/viewvc?rev=814075&view=rev
Log:
PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
    configured(yinghe via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Sep 11 23:20:52 2009
@@ -83,6 +83,9 @@
 
 BUG FIXES
 
+    PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
+    configured(yinghe via olgan)
+
     PIG-882: log level not propogated to loggers - duplicate message (daijy)
 
     PIG-943: Pig crash when it cannot get counter from hadoop (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Sep 11 23:20:52 2009
@@ -1745,7 +1745,8 @@
         
        try{                    
                // pass configurations to the User Function
-               String per = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", "0.5");
+               String per = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", 
+                                   
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
                String mc = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
                String inputFile = lFile.getFileName();
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
Fri Sep 11 23:20:52 2009
@@ -65,6 +65,8 @@
 
        public static final String TOTAL_REDUCERS = "totalreducers";
 
+       public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+
        private Log log = LogFactory.getLog(getClass());
 
        BagFactory mBagFactory = BagFactory.getInstance();
@@ -104,7 +106,7 @@
                        tupleMCount_ = Integer.parseInt(args[1]);
                        inputFile_ = args[2];                   
                } else {
-                       heapPercentage_ = 0.5;
+                       heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
                }
                
                if (log.isDebugEnabled()) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
Fri Sep 11 23:20:52 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 
 /**
  * Currently skipInterval is similar to the randomsampleloader. However, if we 
were to use an
@@ -106,7 +107,14 @@
                }
                
                // % of memory available for the records
-               float heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+               float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+                if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+                   try {
+                        heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+                    }catch(NumberFormatException e) {
+                       // ignore, use default value
+                    }
+                }
                
                // we are only concerned with the first input for skewed join
                String fname = inputs.get(0).first.getFileName();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Sep 11 
23:20:52 2009
@@ -147,6 +147,38 @@
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }      
     
+    public void testSkewedJoinWithNoProperties() throws IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by (id, name), B by (id, 
name) using \"skewed\" parallel 5;");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+           {
+                pigServer.registerQuery("E = join A by(id, name), B by (id, 
name);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
+
+               while(iter.hasNext()) {
+                    dbshj.add(iter.next());
+               }
+            }
+            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        }catch(Exception e) {
+             fail(e.getMessage());
+        }
+    }
+
     public void testSkewedJoinReducers() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");


Reply via email to