Author: pradeepkth
Date: Wed Sep 2 19:30:42 2009
New Revision: 810677
URL: http://svn.apache.org/viewvc?rev=810677&view=rev
Log:
PIG-935: Skewed join throws an exception when used with map keys(sriranjan via
pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep 2 19:30:42 2009
@@ -60,6 +60,9 @@
BUG FIXES
+ PIG-935: Skewed join throws an exception when used with map keys(sriranjan
+ via pradeepkth)
+
PIG-934: Merge join implementation currently does not seek to right point
on the right side input based on the offset provided by the index
(ashutoshc via pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Wed Sep 2 19:30:42 2009
@@ -1489,9 +1489,16 @@
Pair[] ret = new Pair[plans.size()];
int i=-1;
for (PhysicalPlan plan : plans) {
- if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
- int first = ((POProject)plan.getLeaves().get(0)).getColumn();
- byte second =
((POProject)plan.getLeaves().get(0)).getResultType();
+ PhysicalOperator op = plan.getLeaves().get(0);
+ int first = -1;
+ if (op instanceof POProject) {
+ if (((POProject)op).isStar()) return null;
+ first = ((POProject)op).getColumn();
+ } else {
+ // the plan is not POProject, so we don't know the column
index
+ first = -1;
+ }
+ byte second = plan.getLeaves().get(0).getResultType();
ret[++i] = new Pair<Integer,Byte>(first,second);
}
return ret;
@@ -1705,8 +1712,6 @@
List<PhysicalOperator> l = plan.getPredecessors(op);
List<PhysicalPlan> groups = (List<PhysicalPlan>)joinPlans.get(l.get(0));
-
-
List<Boolean> ascCol = new ArrayList<Boolean>();
for(int i=0; i<groups.size(); i++) {
ascCol.add(false);
@@ -1734,7 +1739,7 @@
ep.add(uf);
ep.add(prjStar);
ep.connect(prjStar, uf);
-
+
transformPlans.add(ep);
try{
@@ -1742,7 +1747,7 @@
String per =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", "0.5");
String mc =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
String inputFile = lFile.getFileName();
-
+
return getSamplingJob(sort, prevJob, transformPlans, lFile,
sampleFile, rp, null,
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile});
}catch(Exception e) {
@@ -1818,6 +1823,14 @@
for (Pair<Integer,Byte> i : fields) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ // Check for i being equal to -1. -1 is used by
getSortCols for a non POProject
+ // operator. Since Order by does not allow expression
operators, it should never be set to
+ // -1
+ if (i.first == -1) {
+ int errCode = 2174;
+ String msg = "Internal exception. Could not create a
sampler job";
+ throw new MRCompilerException(msg, errCode,
PigException.BUG);
+ }
prj.setColumn(i.first);
prj.setOverloaded(false);
prj.setResultType(i.second);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=810677&r1=810676&r2=810677&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Wed Sep 2
19:30:42 2009
@@ -40,6 +40,7 @@
private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
+ private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
@@ -48,7 +49,7 @@
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
// pigServer = new PigServer(ExecType.LOCAL);
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
"5");
-
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
"0.1");
+
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
"0.01");
}
@Before
@@ -93,9 +94,16 @@
w3.close();
+ PrintWriter w4 = new PrintWriter(new FileWriter(INPUT_FILE4));
+ for(int i=0; i < 100; i++) {
+
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
+ }
+ w4.close();
+
Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
}
@After
@@ -103,10 +111,12 @@
new File(INPUT_FILE1).delete();
new File(INPUT_FILE2).delete();
new File(INPUT_FILE3).delete();
+ new File(INPUT_FILE4).delete();
Util.deleteFile(cluster, INPUT_FILE1);
Util.deleteFile(cluster, INPUT_FILE2);
Util.deleteFile(cluster, INPUT_FILE3);
+ Util.deleteFile(cluster, INPUT_FILE4);
}
@@ -177,4 +187,26 @@
fail("Should throw exception, do not support 3 way join");
}
+
+ public void testSkewedJoinMapKey() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE4 + "' as (m:[]);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE4 + "' as (n:[]);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by (chararray)m#'a100', B
by (chararray)n#'a100' using \"skewed\" parallel 20;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ }catch(Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ fail("Should support maps and expression operators as keys");
+ }
+
+ return;
+ }
}