Author: olga Date: Mon Dec 21 19:32:26 2009 New Revision: 892939 URL: http://svn.apache.org/viewvc?rev=892939&view=rev Log: PIG-1143: Poisson Sample Loader should compute the number of samples required only once (sriranjan via olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java hadoop/pig/trunk/test/org/apache/pig/test/Util.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Dec 21 19:32:26 2009 @@ -65,6 +65,9 @@ BUG FIXES +PIG-1143: Poisson Sample Loader should compute the number of samples required +only once (sriranjan via olgan) + PIG-1157: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM (rding via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Dec 21 19:32:26 2009 @@ -1845,7 +1845,7 @@ String inputFile = lFile.getFileName(); return getSamplingJob(sort, prevJob, transformPlans, lFile, sampleFile, rp, null, - PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, RandomSampleLoader.class.getName()); + PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, PoissonSampleLoader.class.getName()); }catch(Exception e) { throw new PlanException(e); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon Dec 21 19:32:26 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,11 +51,14 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.SampleLoader; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.io.ValidatingInputFileSpec; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable { @@ -239,6 +243,20 @@ if ((spec.getSlicer() instanceof PigSlicer)) { ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable); } + + /* Set the input size in UDF Context if LoadFunc is a sample loader. + * This value is used by PoissonSampleLoader to calculate the number of + * samplable tuples + */ + Object loader = PigContext.instantiateFuncFromSpec(spec.getFuncSpec()); + if (loader instanceof SampleLoader) { + Long iSize = FileLocalizer.getSize(spec.getFileName(), pigContext.getProperties()); + UDFContext udfc = UDFContext.getUDFContext(); + Properties p = udfc.getUDFProperties(SampleLoader.class); + p.setProperty("pig.input." + i + ".size", Long.toString(iSize)); + udfc.serialize(job); + } + Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName()); for (Slice split : pigs) { splits.add(new SliceWrapper(split, pigContext.getExecType(), i, fs, inpTargets.get(i))); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Mon Dec 21 19:32:26 2009 @@ -28,6 +28,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; +import java.util.Properties; import java.util.Set; import java.util.List; @@ -51,6 +52,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.SampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; @@ -141,6 +143,8 @@ wrapped.init(store); + udfc.serialize(job); + job.set("map.target.ops", ObjectSerializer.serialize(targetOps)); // Mimic org.apache.hadoop.mapred.FileSplit if feasible... String[] locations = wrapped.getLocations(); Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Mon Dec 21 19:32:26 2009 @@ -32,6 +32,7 @@ import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.builtin.PartitionSkewedKeys; /** @@ -85,6 +86,7 @@ */ @Override public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException { + int numSplits, convFactor, sampleRate; Properties pcProps = pc.getProperties(); @@ -110,25 +112,23 @@ // % of memory available for the records float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; - if (pcProps.getProperty(PERC_MEM_AVAIL) != null) { - try { - heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); - }catch(NumberFormatException e) { - // ignore, use default value - } - } - - // we are only concerned with the first input for skewed join - String fname = inputs.get(0).first.getFileName(); - - // calculate the base number of samples + if (pcProps.getProperty(PERC_MEM_AVAIL) != null) { + try { + heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); + } catch (NumberFormatException e) { + // ignore, use default value + } + } + + // We are concerned with the size of the first input. In case of globs / directories, + // this size is the total size of all the files present in them + Properties p = UDFContext.getUDFContext().getUDFProperties(SampleLoader.class); + Long iSize = Long.valueOf((String) p.get("pig.input.0.size")); + + // calculate the base number of samples try { - float f = (Runtime.getRuntime().maxMemory() * heapPerc) / (float) (FileLocalizer.getSize(fname,pcProps) * convFactor); + float f = (Runtime.getRuntime().maxMemory() * heapPerc) / (float) (iSize * convFactor); baseNumSamples = (long) Math.ceil(1.0 / f); - } catch (IOException e) { - int errCode = 2175; - String msg = "Internal error. Could not retrieve file size for the sampler."; - throw new ExecException(msg, errCode, PigException.BUG); } catch (ArithmeticException e) { int errCode = 1105; String msg = "Heap percentage / Conversion factor cannot be set to 0"; @@ -141,6 +141,7 @@ // set the minimum number of samples to 1 n = (n > 1) ? n : 1; setNumSamples(n); + } @Override Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Mon Dec 21 19:32:26 2009 @@ -30,18 +30,24 @@ import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.PoissonSampleLoader; +import org.apache.pig.impl.builtin.SampleLoader; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.test.utils.TestHelper; import org.junit.After; import org.junit.Before; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import java.util.Properties; + public class TestPoissonSampleLoader extends TestCase{ private static final String INPUT_FILE1 = "SkewedJoinInput1.txt"; @@ -96,6 +102,8 @@ ArrayList<Pair<FileSpec, Boolean>> inputs = new ArrayList<Pair<FileSpec, Boolean> >(); inputs.add(new Pair<FileSpec, Boolean>(fs, true)); + Properties p = UDFContext.getUDFContext().getUDFProperties(SampleLoader.class); + p.setProperty("pig.input.0.size", Long.toString(Util.getSize(cluster, INPUT_FILE1))); // Use 100 as a default value; PoissonSampleLoader ps = new PoissonSampleLoader((new FuncSpec(PigStorage.class.getName())).toString(), "100"); @@ -104,7 +112,7 @@ ps.computeSamples(inputs, pigServer.getPigContext()); if (ps.getNumSamples() != 3) { - fail("Compute samples returned the wrong number of samples"); + fail("Compute samples returned the wrong number of samples: " + ps.getNumSamples() + " instead of 3"); } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Dec 21 19:32:26 2009 @@ -24,6 +24,10 @@ import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.SimpleLayout; import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; @@ -444,4 +448,61 @@ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj)); } + + /* Test to check if the samplers sample different input files in the case of + * serial successive joins + */ + public void testSuccessiveJoins() throws IOException { + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id,name);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id,name);"); + pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as (id,name);"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join A by id, B by id using \"skewed\";"); + pigServer.registerQuery("E = join D by A::id, C by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join A by id, B by id;"); + pigServer.registerQuery("E = join D by A::id, C by id;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbrj.add(iter.next()); + } + } + Assert.assertEquals(dbfrj.size(), dbrj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj)); + } + + public void testMultiQuery() throws IOException { + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id,name);"); + pigServer.registerQuery("B = FILTER A by id == 100;"); + pigServer.registerQuery("C = FILTER A by id == 200;"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join B by id, C by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join B by id, C by id;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbrj.add(iter.next()); + } + } + Assert.assertEquals(dbfrj.size(), dbrj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj)); + } + } Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=892939&r1=892938&r2=892939&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon Dec 21 19:32:26 2009 @@ -242,6 +242,23 @@ } /** + * Helper to return the file size on the MiniCluster dfs. + * + * @param miniCluster reference to the Minicluster where the file should be created + * @param fileName pathname of the file to be created + * @throws IOException + */ + static public long getSize(MiniCluster miniCluster, String fileName) + throws IOException { + FileSystem fs = miniCluster.getFileSystem(); + Path p = new Path(fileName); + if(!fs.exists(p)) { + throw new IOException("File " + fileName + " does not exist on the minicluster"); + } + return fs.getFileStatus(p).getLen(); + } + + /** * Helper to create a dfs file on the MiniCluster dfs. This returns an * outputstream that can be used in test cases to write data. *