Author: olga
Date: Tue Dec 22 18:22:20 2009
New Revision: 893259
URL: http://svn.apache.org/viewvc?rev=893259&view=rev
Log:
PIG-1143: Poisson Sample Loader should compute the number of samples required
only once (sriranjan via olgan)
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPoissonSampleLoader.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Dec 22 18:22:20 2009
@@ -141,6 +141,9 @@
BUG FIXES
+PIG-1143: Poisson Sample Loader should compute the number of samples required
+only once (sriranjan via olgan)
+
PIG-1135: skewed join partitioner returns negative partition index (yinghe
via olgan)
PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via
olgan)
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Tue Dec 22 18:22:20 2009
@@ -1401,6 +1401,7 @@
}
FileSpec fSpec = getTempFileSpec();
+ System.err.println("#...@#@ fSpec: " +
fSpec.getFileName());
MapReduceOper mro = compiledInputs[0];
POStore str = getStore();
str.setSFile(fSpec);
@@ -1845,7 +1846,7 @@
String inputFile = lFile.getFileName();
return getSamplingJob(sort, prevJob, transformPlans, lFile,
sampleFile, rp, null,
-
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile},
RandomSampleLoader.class.getName());
+
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile},
PoissonSampleLoader.class.getName());
}catch(Exception e) {
throw new PlanException(e);
}
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Tue Dec 22 18:22:20 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,11 +51,14 @@
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.ValidatingInputFileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
public class PigInputFormat implements InputFormat<Text, Tuple>,
JobConfigurable {
@@ -239,6 +243,20 @@
if ((spec.getSlicer() instanceof PigSlicer)) {
((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
}
+
+ /* Set the input size in UDF Context if LoadFunc is a sample
loader.
+ * This value is used by PoissonSampleLoader to calculate the
number of
+ * samplable tuples
+ */
+ Object loader =
PigContext.instantiateFuncFromSpec(spec.getFuncSpec());
+ if (loader instanceof SampleLoader) {
+ Long iSize = FileLocalizer.getSize(spec.getFileName(),
pigContext.getProperties());
+ UDFContext udfc = UDFContext.getUDFContext();
+ Properties p = udfc.getUDFProperties(SampleLoader.class);
+ p.setProperty("pig.input." + i + ".size",
Long.toString(iSize));
+ udfc.serialize(job);
+ }
+
Slice[] pigs = spec.getSlicer().slice(store,
spec.getFileName());
for (Slice split : pigs) {
splits.add(new SliceWrapper(split,
pigContext.getExecType(), i, fs, inpTargets.get(i)));
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
Tue Dec 22 18:22:20 2009
@@ -28,6 +28,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
import java.util.List;
@@ -51,6 +52,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
@@ -141,6 +143,8 @@
wrapped.init(store);
+ udfc.serialize(job);
+
job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
// Mimic org.apache.hadoop.mapred.FileSplit if feasible...
String[] locations = wrapped.getLocations();
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
Tue Dec 22 18:22:20 2009
@@ -32,6 +32,7 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
/**
@@ -85,6 +86,7 @@
*/
@Override
public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs,
PigContext pc) throws ExecException {
+
int numSplits, convFactor, sampleRate;
Properties pcProps = pc.getProperties();
@@ -110,25 +112,23 @@
// % of memory available for the records
float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
- if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
- try {
- heapPerc =
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
- }catch(NumberFormatException e) {
- // ignore, use default value
- }
- }
-
- // we are only concerned with the first input for skewed join
- String fname = inputs.get(0).first.getFileName();
-
- // calculate the base number of samples
+ if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+ try {
+ heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+ } catch (NumberFormatException e) {
+ // ignore, use default value
+ }
+ }
+
+ // We are concerned with the size of the first input. In case of globs
/ directories,
+ // this size is the total size of all the files present in them
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(SampleLoader.class);
+ Long iSize = Long.valueOf((String) p.get("pig.input.0.size"));
+
+ // calculate the base number of samples
try {
- float f = (Runtime.getRuntime().maxMemory() * heapPerc)
/ (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
+ float f = (Runtime.getRuntime().maxMemory() * heapPerc) /
(float) (iSize * convFactor);
baseNumSamples = (long) Math.ceil(1.0 / f);
- } catch (IOException e) {
- int errCode = 2175;
- String msg = "Internal error. Could not retrieve file
size for the sampler.";
- throw new ExecException(msg, errCode, PigException.BUG);
} catch (ArithmeticException e) {
int errCode = 1105;
String msg = "Heap percentage / Conversion factor
cannot be set to 0";
@@ -141,6 +141,7 @@
// set the minimum number of samples to 1
n = (n > 1) ? n : 1;
setNumSamples(n);
+
}
@Override
Modified:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPoissonSampleLoader.java
(original)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPoissonSampleLoader.java
Tue Dec 22 18:22:20 2009
@@ -30,18 +30,24 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.builtin.SampleLoader;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.Before;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import java.util.Properties;
+
public class TestPoissonSampleLoader extends TestCase{
private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
@@ -96,6 +102,8 @@
ArrayList<Pair<FileSpec, Boolean>> inputs = new
ArrayList<Pair<FileSpec, Boolean> >();
inputs.add(new Pair<FileSpec, Boolean>(fs, true));
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(SampleLoader.class);
+ p.setProperty("pig.input.0.size",
Long.toString(Util.getSize(cluster, INPUT_FILE1)));
// Use 100 as a default value;
PoissonSampleLoader ps = new PoissonSampleLoader((new
FuncSpec(PigStorage.class.getName())).toString(), "100");
@@ -104,7 +112,7 @@
ps.computeSamples(inputs, pigServer.getPigContext());
if (ps.getNumSamples() != 3) {
- fail("Compute samples returned the wrong number of samples");
+ fail("Compute samples returned the wrong number of samples: " +
ps.getNumSamples() + " instead of 3");
}
}
Modified:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java
Tue Dec 22 18:22:20 2009
@@ -24,6 +24,10 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -444,4 +448,61 @@
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
}
+
+ /* Test to check if the samplers sample different input files in the case
of
+ * serial successive joins
+ */
+ public void testSuccessiveJoins() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as
(id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as
(id,name);");
+ pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as
(id,name);");
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join A by id, B by id using
\"skewed\";");
+ pigServer.registerQuery("E = join D by A::id, C by id using
\"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join A by id, B by id;");
+ pigServer.registerQuery("E = join D by A::id, C by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbrj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbrj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+ }
+
+ public void testMultiQuery() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as
(id,name);");
+ pigServer.registerQuery("B = FILTER A by id == 100;");
+ pigServer.registerQuery("C = FILTER A by id == 200;");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("D = join B by id, C by id using
\"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("D = join B by id, C by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ dbrj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbrj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+ }
+
}
Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/Util.java?rev=893259&r1=893258&r2=893259&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/Util.java Tue Dec
22 18:22:20 2009
@@ -242,6 +242,23 @@
}
/**
+ * Helper to return the file size on the MiniCluster dfs.
+ *
+ * @param miniCluster reference to the Minicluster where the file should
be created
+ * @param fileName pathname of the file to be created
+ * @throws IOException
+ */
+ static public long getSize(MiniCluster miniCluster, String fileName)
+ throws IOException {
+ FileSystem fs = miniCluster.getFileSystem();
+ Path p = new Path(fileName);
+ if(!fs.exists(p)) {
+ throw new IOException("File " + fileName + " does not exist on the
minicluster");
+ }
+ return fs.getFileStatus(p).getLen();
+ }
+
+ /**
* Helper to create a dfs file on the MiniCluster dfs. This returns an
* outputstream that can be used in test cases to write data.
*