Author: olga
Date: Fri Sep 11 23:20:52 2009
New Revision: 814075
URL: http://svn.apache.org/viewvc?rev=814075&view=rev
Log:
PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
configured(yinghe via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Sep 11 23:20:52 2009
@@ -83,6 +83,9 @@
BUG FIXES
+ PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
+ configured(yinghe via olgan)
+
PIG-882: log level not propogated to loggers - duplicate message (daijy)
PIG-943: Pig crash when it cannot get counter from hadoop (daijy)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Sep 11 23:20:52 2009
@@ -1745,7 +1745,8 @@
try{
// pass configurations to the User Function
- String per =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", "0.5");
+ String per =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
+
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
String mc =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
String inputFile = lFile.getFileName();
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
Fri Sep 11 23:20:52 2009
@@ -65,6 +65,8 @@
public static final String TOTAL_REDUCERS = "totalreducers";
+ public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+
private Log log = LogFactory.getLog(getClass());
BagFactory mBagFactory = BagFactory.getInstance();
@@ -104,7 +106,7 @@
tupleMCount_ = Integer.parseInt(args[1]);
inputFile_ = args[2];
} else {
- heapPercentage_ = 0.5;
+ heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
}
if (log.isDebugEnabled()) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
Fri Sep 11 23:20:52 2009
@@ -30,6 +30,7 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
/**
* Currently skipInterval is similar to the randomsampleloader. However, if we
were to use an
@@ -106,7 +107,14 @@
}
// % of memory available for the records
- float heapPerc =
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+ float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+ if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+ try {
+ heapPerc =
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+ }catch(NumberFormatException e) {
+ // ignore, use default value
+ }
+ }
// we are only concerned with the first input for skewed join
String fname = inputs.get(0).first.getFileName();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=814075&r1=814074&r2=814075&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Sep 11
23:20:52 2009
@@ -147,6 +147,38 @@
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
+ public void testSkewedJoinWithNoProperties() throws IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name,
n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id,
name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by (id, name), B by (id,
name) using \"skewed\" parallel 5;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("E = join A by(id, name), B by (id,
name);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ }catch(Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
public void testSkewedJoinReducers() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name,
n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id,
name);");