Author: olga Date: Sun Sep 20 01:10:57 2009 New Revision: 816978 URL: http://svn.apache.org/viewvc?rev=816978&view=rev Log: PIG-964: Handling null in skewed join (sriranjan via olgan)
Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt hadoop/pig/branches/branch-0.4/build.xml hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.4/CHANGES.txt Sun Sep 20 01:10:57 2009 @@ -73,6 +73,10 @@ BUG FIXES + PIG-964: Handling null in skewed join (sriranjan via olgan) + + PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan) + PIG-963: Join in local mode matches null keys (pradeepkth) PIG-957: Tutorial is broken with 0.4 branch and trunk (pradeepkth) Modified: hadoop/pig/branches/branch-0.4/build.xml URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/build.xml?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/build.xml (original) +++ hadoop/pig/branches/branch-0.4/build.xml Sun Sep 20 01:10:57 2009 @@ -493,7 +493,7 @@ <mkdir dir="${dist.dir}/license" /> <copy todir="${dist.dir}/lib" includeEmptyDirs="false"> - <fileset dir="${ivy.lib.dir}"/> + <!--fileset dir="${ivy.lib.dir}"/--> <fileset dir="${lib.dir}"/> </copy> Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/PigWarning.java Sun Sep 20 01:10:57 2009 @@ -59,5 +59,6 @@ UNABLE_TO_CLOSE_SPILL_FILE, UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED, USING_OVERLOADED_FUNCTION, + REDUCER_COUNT_LOW, NULL_COUNTER_COUNT; } Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Sun Sep 20 01:10:57 2009 @@ -81,7 +81,7 @@ POLoad load = (POLoad)po; String loadFunc = load.getLFile().getFuncName(); String loadFile = load.getLFile().getFileName(); - if (!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && !("org.apache.pig.impl.builtin.SkewedJoinSampleLoader".equals(loadFunc))) { + if (!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && !("org.apache.pig.impl.builtin.PoissonSampleLoader".equals(loadFunc))) { log.debug("Not a sampling job."); return; } Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Sun Sep 20 01:10:57 2009 @@ -77,20 +77,20 @@ // for partition table, compute the index based on the sampler output Pair <Integer, Integer> indexes; Integer curIndex = -1; - Tuple keyTuple = null; + Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1); // extract the key from nullablepartitionwritable PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey(); - if (key instanceof NullableTuple) { + try { + keyTuple.set(0, key.getValueAsPigType()); + } catch (ExecException e) { + return -1; + } + + // if the key is not null and key + if (key instanceof NullableTuple && key.getValueAsPigType() != null) { keyTuple = (Tuple)key.getValueAsPigType(); - } else { - keyTuple = DefaultTupleFactory.getInstance().newTuple(1); - try { - keyTuple.set(0, key.getValueAsPigType()); - } catch (ExecException e) { - return -1; - } } indexes = reducerMap.get(keyTuple); Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Sun Sep 20 01:10:57 2009 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.EvalFunc; +import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; @@ -198,8 +199,13 @@ } if (maxReducers > totalReducers_) { - throw new RuntimeException("You need at least " + maxReducers - + " reducers to run this job."); + if(pigLogger != null) { + pigLogger.warn(this,"You need at least " + maxReducers + + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW); + } else { + log.warn("You need at least " + maxReducers + + " reducers to avoid spillage and run this job efficiently."); + } } output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList)); Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/impl/builtin/SampleLoader.java Sun Sep 20 01:10:57 2009 @@ -152,7 +152,7 @@ // we move to next boundry t = loader.getSampledTuple(); long finalPos = loader.getPosition(); - + long toSkip = skipInterval - (finalPos - initialPos); if (toSkip > 0) { long rc = loader.skip(toSkip); @@ -187,7 +187,7 @@ } // add size of the tuple at the end - m.set(t.size(), (finalPos-middlePos)); + m.set(t.size(), (finalPos-middlePos) + 1); // offset 1 for null return m; } Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java (original) +++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSampleOptimizer.java Sun Sep 20 01:10:57 2009 @@ -180,4 +180,36 @@ } tmpFile.delete(); } + + @Test + public void testPoissonSampleOptimizer() throws Exception { + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan(" A = load 'input' using PigStorage('\t');"); + planTester.buildPlan("B = load 'input' using PigStorage('\t');"); + planTester.buildPlan(" C = join A by $0, B by $0 using \"skewed\";"); + LogicalPlan lp = planTester.buildPlan("store C into 'output';"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + int count = 1; + MapReduceOper mrOper = mrPlan.getRoots().get(0); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + ++count; + } + // Before optimizer visits, number of MR jobs = 3. + assertEquals(3,count); + + SampleOptimizer so = new SampleOptimizer(mrPlan); + so.visit(); + + count = 1; + mrOper = mrPlan.getRoots().get(0); + while(mrPlan.getSuccessors(mrOper) != null) { + mrOper = mrPlan.getSuccessors(mrOper).get(0); + ++count; + } + // After optimizer visits, number of MR jobs = 2 + assertEquals(2,count); + } } Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=816978&r1=816977&r2=816978&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java Sun Sep 20 01:10:57 2009 @@ -41,6 +41,7 @@ private static final String INPUT_FILE2 = "SkewedJoinInput2.txt"; private static final String INPUT_FILE3 = "SkewedJoinInput3.txt"; private static final String INPUT_FILE4 = "SkewedJoinInput4.txt"; + private static final String INPUT_FILE5 = "SkewedJoinInput5.txt"; private PigServer pigServer; private MiniCluster cluster = MiniCluster.buildCluster(); @@ -99,11 +100,26 @@ w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]"); } w4.close(); - + + // Create a file with null keys + PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5)); + for(int i=0; i < 10; i++) { + w5.println("\tapple1"); + } + w5.println("100\tapple2"); + for(int i=0; i < 10; i++) { + w5.println("\torange1"); + } + w5.println("\t"); + w5.println("100\t"); + w5.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1); Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2); Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3); Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4); + Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5); + } @After @@ -118,6 +134,8 @@ Util.deleteFile(cluster, INPUT_FILE2); Util.deleteFile(cluster, INPUT_FILE3); Util.deleteFile(cluster, INPUT_FILE4); + Util.deleteFile(cluster, INPUT_FILE5); + } @@ -194,10 +212,9 @@ } } }catch(Exception e) { - return; + fail("Should not throw exception, should continue execution"); } - fail("Should throw exception, not enough reducers"); } public void testSkewedJoin3Way() throws IOException{ @@ -286,4 +303,25 @@ } } + public void testSkewedJoinNullKeys() throws IOException { + pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);"); + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join A by id, B by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + } catch(Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + fail("Should support null keys in skewed join"); + } + return; + } + }