Author: olga
Date: Fri Sep 18 04:55:49 2009
New Revision: 816470
URL: http://svn.apache.org/viewvc?rev=816470&view=rev
Log:
PIG-964: Handling null in skewed join (sriranjan via olgan)
Modified:
hadoop/pig/branches/branch-0.5/CHANGES.txt
hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/branches/branch-0.5/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/CHANGES.txt?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.5/CHANGES.txt Fri Sep 18 04:55:49 2009
@@ -85,6 +85,8 @@
PIG-792: skew join implementation (sriranjan via olgan)
BUG FIXES
+ PIG-964: Handling null in skewed join (sriranjan via olgan)
+
PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)
PIG-957: Tutorial is broken with 0.4 branch and trunk (pradeepkth)
Modified: hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/branches/branch-0.5/src/org/apache/pig/PigWarning.java Fri Sep
18 04:55:49 2009
@@ -59,5 +59,6 @@
UNABLE_TO_CLOSE_SPILL_FILE,
UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
USING_OVERLOADED_FUNCTION,
+ REDUCER_COUNT_LOW,
NULL_COUNTER_COUNT;
}
Modified:
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
(original)
+++
hadoop/pig/branches/branch-0.5/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
Fri Sep 18 04:55:49 2009
@@ -77,20 +77,20 @@
// for partition table, compute the index based on the sampler
output
Pair <Integer, Integer> indexes;
Integer curIndex = -1;
- Tuple keyTuple = null;
+ Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
// extract the key from nullablepartitionwritable
PigNullableWritable key = ((NullablePartitionWritable)
wrappedKey).getKey();
- if (key instanceof NullableTuple) {
+ try {
+ keyTuple.set(0, key.getValueAsPigType());
+ } catch (ExecException e) {
+ return -1;
+ }
+
+ // if the key is not null and key
+ if (key instanceof NullableTuple && key.getValueAsPigType() !=
null) {
keyTuple = (Tuple)key.getValueAsPigType();
- } else {
- keyTuple =
DefaultTupleFactory.getInstance().newTuple(1);
- try {
- keyTuple.set(0, key.getValueAsPigType());
- } catch (ExecException e) {
- return -1;
- }
}
indexes = reducerMap.get(keyTuple);
Modified:
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
(original)
+++
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
Fri Sep 18 04:55:49 2009
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -198,8 +199,13 @@
}
if (maxReducers > totalReducers_) {
- throw new RuntimeException("You need at least "
+ maxReducers
- + " reducers to run this job.");
+ if(pigLogger != null) {
+ pigLogger.warn(this,"You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job
efficiently.", PigWarning.REDUCER_COUNT_LOW);
+ } else {
+ log.warn("You need at least " + maxReducers
+ + " reducers to avoid spillage
and run this job efficiently.");
+ }
}
output.put(PARTITION_LIST,
mBagFactory.newDefaultBag(reducerList));
Modified:
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
(original)
+++
hadoop/pig/branches/branch-0.5/src/org/apache/pig/impl/builtin/SampleLoader.java
Fri Sep 18 04:55:49 2009
@@ -152,7 +152,7 @@
// we move to next boundry
t = loader.getSampledTuple();
long finalPos = loader.getPosition();
-
+
long toSkip = skipInterval - (finalPos - initialPos);
if (toSkip > 0) {
long rc = loader.skip(toSkip);
@@ -187,7 +187,7 @@
}
// add size of the tuple at the end
- m.set(t.size(), (finalPos-middlePos));
+ m.set(t.size(), (finalPos-middlePos) + 1); // offset 1 for null
return m;
}
Modified:
hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java?rev=816470&r1=816469&r2=816470&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java
(original)
+++ hadoop/pig/branches/branch-0.5/test/org/apache/pig/test/TestSkewedJoin.java
Fri Sep 18 04:55:49 2009
@@ -41,6 +41,7 @@
private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
+ private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
@@ -99,11 +100,26 @@
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
}
w4.close();
-
+
+ // Create a file with null keys
+ PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5));
+ for(int i=0; i < 10; i++) {
+ w5.println("\tapple1");
+ }
+ w5.println("100\tapple2");
+ for(int i=0; i < 10; i++) {
+ w5.println("\torange1");
+ }
+ w5.println("\t");
+ w5.println("100\t");
+ w5.close();
+
Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
+
}
@After
@@ -118,6 +134,8 @@
Util.deleteFile(cluster, INPUT_FILE2);
Util.deleteFile(cluster, INPUT_FILE3);
Util.deleteFile(cluster, INPUT_FILE4);
+ Util.deleteFile(cluster, INPUT_FILE5);
+
}
@@ -194,10 +212,9 @@
}
}
}catch(Exception e) {
- return;
+ fail("Should not throw exception, should continue execution");
}
- fail("Should throw exception, not enough reducers");
}
public void testSkewedJoin3Way() throws IOException{
@@ -286,4 +303,25 @@
}
}
+ public void testSkewedJoinNullKeys() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as
(id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as
(id,name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id, B by id using
\"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ } catch(Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ fail("Should support null keys in skewed join");
+ }
+ return;
+ }
+
}