Author: olga Date: Fri Sep 11 23:33:33 2009 New Revision: 814082 URL: http://svn.apache.org/viewvc?rev=814082&view=rev Log: PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not configured(yinghe via olgan)
Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=814082&r1=814081&r2=814082&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.4/CHANGES.txt Fri Sep 11 23:33:33 2009 @@ -73,6 +73,9 @@ BUG FIXES + PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not + configured(yinghe via olgan) + PIG-882: log level not propogated to loggers - duplicate message (daijy) PIG-943: Pig crash when it cannot get counter from hadoop (daijy) Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=814082&r1=814081&r2=814082&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 11 23:33:33 2009 @@ -1745,7 +1745,8 @@ try{ // pass configurations to the User Function - String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", "0.5"); + String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", + String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE)); String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0"); String inputFile = lFile.getFileName(); Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=814082&r1=814081&r2=814082&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Fri Sep 11 23:33:33 2009 @@ -65,6 +65,8 @@ public static final String TOTAL_REDUCERS = "totalreducers"; + public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f; + private Log log = LogFactory.getLog(getClass()); BagFactory mBagFactory = BagFactory.getInstance(); @@ -104,7 +106,7 @@ tupleMCount_ = Integer.parseInt(args[1]); inputFile_ = args[2]; } else { - heapPercentage_ = 0.5; + heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE; } if (log.isDebugEnabled()) { Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=814082&r1=814081&r2=814082&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri Sep 11 23:33:33 2009 @@ -30,6 +30,7 @@ import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.builtin.PartitionSkewedKeys; /** * Currently skipInterval is similar to the randomsampleloader. However, if we were to use an @@ -106,7 +107,14 @@ } // % of memory available for the records - float heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); + float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; + if (pcProps.getProperty(PERC_MEM_AVAIL) != null) { + try { + heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); + }catch(NumberFormatException e) { + // ignore, use default value + } + } // we are only concerned with the first input for skewed join String fname = inputs.get(0).first.getFileName(); Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=814082&r1=814081&r2=814082&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java Fri Sep 11 23:33:33 2009 @@ -147,6 +147,38 @@ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } + public void testSkewedJoinWithNoProperties() throws IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);"); + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + DataBag dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join A by (id, name), B by (id, name) using \"skewed\" parallel 5;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("E = join A by(id, name), B by (id, name);"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + + }catch(Exception e) { + fail(e.getMessage()); + } + } + public void testSkewedJoinReducers() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");