Author: daijy
Date: Thu Sep 3 00:36:45 2009
New Revision: 810742
URL: http://svn.apache.org/viewvc?rev=810742&view=rev
Log:
PIG-890: Create a sampler interface and improve the skewed join sampler
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Sep 3 00:36:45 2009
@@ -28,11 +28,13 @@
IMPROVEMENTS
+PIG-890: Create a sampler interface and improve the skewed join sampler
(sriranjan via daijy)
+
PIG-922: Logical optimizer: push up project part 1 (daijy)
PIG-812: COUNT(*) does not work (breed)
-PIG-923: Allow specifying log file location through pig.properties (dvryaboy
through daijy)
+PIG-923: Allow specifying log file location through pig.properties (dvryaboy
via daijy)
PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
Thu Sep 3 00:36:45 2009
@@ -26,6 +26,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.zip.GZIPInputStream;
import org.apache.pig.FuncSpec;
@@ -35,10 +36,15 @@
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.SeekableInputStream;
import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
import org.apache.tools.bzip2r.CBZip2InputStream;
/**
@@ -69,9 +75,10 @@
return new String[] { file };
}
- public void init(DataStorage base) throws IOException {
+ @SuppressWarnings("unchecked")
+ public void init(DataStorage base) throws IOException {
if (parser == null) {
- loader = new PigStorage();
+ loader = new PigStorage();
} else {
try {
loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
@@ -95,9 +102,24 @@
end = Long.MAX_VALUE;
} else {
is = fsis;
- }
+ }
+
+ // set the right sample size
+ if (loader instanceof SampleLoader) {
+ try {
+ PigContext pc = (PigContext)
ObjectSerializer.deserialize(((HDataStorage)base).getConfiguration().getProperty("pig.pigContext"));
+ ArrayList<Pair<FileSpec, Boolean>> inputs =
+ (ArrayList<Pair<FileSpec, Boolean>>)
ObjectSerializer.deserialize(((HDataStorage)base).getConfiguration().getProperty("pig.inputs"));
+
+ ((SampleLoader)loader).computeSamples(inputs, pc);
+ } catch (Exception e) {
+ throw new ExecException(e.getMessage());
+ }
+ }
+
loader.bindTo(file.toString(), new BufferedPositionedInputStream(is,
start), start, end);
+
}
public boolean next(Tuple value) throws IOException {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Sep 3 00:36:45 2009
@@ -39,6 +39,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.builtin.MergeJoinIndexer;
import org.apache.pig.impl.builtin.TupleSize;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
@@ -1699,7 +1700,7 @@
}
}
- return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null,
FindQuantiles.class.getName(), ctorArgs);
+ return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null,
FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
}
/**
@@ -1749,7 +1750,7 @@
String inputFile = lFile.getFileName();
return getSamplingJob(sort, prevJob, transformPlans, lFile,
sampleFile, rp, null,
-
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile});
+
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile},
PoissonSampleLoader.class.getName());
}catch(Exception e) {
throw new PlanException(e);
}
@@ -1776,21 +1777,25 @@
* @param sortKeyPlans PhysicalPlans to be set into POSort operator to
get sorting keys
* @param udfClassName the class name of UDF
* @param udfArgs the arguments of UDF
+ * @param sampleLdrClassName class name for the sample loader
* @return pair<mapreduceoper,integer>
* @throws PlanException
* @throws VisitorException
*/
protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort,
MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
FileSpec lFile, FileSpec sampleFile, int rp,
List<PhysicalPlan> sortKeyPlans,
- String udfClassName, String[] udfArgs ) throws
PlanException, VisitorException {
+ String udfClassName, String[] udfArgs, String
sampleLdrClassName ) throws PlanException, VisitorException {
String[] rslargs = new String[2];
- // RandomSampleLoader expects string version of FuncSpec
+ // SampleLoader expects string version of FuncSpec
// as its first constructor argument.
+
rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
- rslargs[1] = "100"; // TODO Needs to be determined based on file size.
+
+ rslargs[1] = "100"; // The value is calculated based on the file size
for skewed join
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
- new FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+ new FuncSpec(sampleLdrClassName, rslargs));
+
MapReduceOper mro = startNew(quantLdFilName, prevJob);
if(sort.isUDFComparatorUsed) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Thu Sep 3 00:36:45 2009
@@ -178,7 +178,8 @@
* per DFS block of the input file. Configures the PigSlice
* and returns the list of PigSlices as an array
*/
- public InputSplit[] getSplits(JobConf job, int numSplits)
+ @SuppressWarnings("unchecked")
+ public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
ArrayList<Pair<FileSpec, Boolean>> inputs;
ArrayList<ArrayList<OperatorKey>> inpTargets;
@@ -202,24 +203,24 @@
try {
Path path = new
Path(inputs.get(i).first.getFileName());
- FileSystem fs;
+ FileSystem fs;
- try {
- fs = path.getFileSystem(job);
- } catch (Exception e) {
- // If an application specific
- // scheme was used
- // (e.g.: "hbase://table") we will fail
- // getting the file system. That's
- // ok, we just use the dfs in that case.
- fs = new Path("/").getFileSystem(job);
- }
+ try {
+ fs = path.getFileSystem(job);
+ } catch (Exception e) {
+ // If an application specific
+ // scheme was used
+ // (e.g.: "hbase://table") we will fail
+ // getting the file system. That's
+ // ok, we just use the dfs in that case.
+ fs = new Path("/").getFileSystem(job);
+ }
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
if(pigContext.getExecType() ==
ExecType.MAPREDUCE) {
fs.setWorkingDirectory(new Path("/user",
job.getUser()));
- }
+ }
DataStorage store = new
HDataStorage(ConfigurationUtil.toProperties(job));
ValidatingInputFileSpec spec;
@@ -244,6 +245,9 @@
throw new ExecException(msg, errCode,
PigException.BUG, e);
}
}
+ // set the number of map tasks
+ pigContext.getProperties().setProperty("pig.mapsplits.count",
Integer.toString(splits.size()));
+ job.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
return splits.toArray(new SliceWrapper[splits.size()]);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
Thu Sep 3 00:36:45 2009
@@ -81,7 +81,7 @@
POLoad load = (POLoad)po;
String loadFunc = load.getLFile().getFuncName();
String loadFile = load.getLFile().getFileName();
- if
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc))) {
+ if
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) &&
!("org.apache.pig.impl.builtin.SkewedJoinSampleLoader".equals(loadFunc))) {
log.debug("Not a sampling job.");
return;
}
@@ -178,7 +178,7 @@
rslargs[0] = predFs.getFuncSpec().toString();
// Second argument is the number of samples per block, read this from
the original.
rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
- FileSpec fs = new FileSpec(predFs.getFileName(),new
FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+ FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc,
rslargs));
POLoad newLoad = new
POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs,
load.isSplittable());
try {
mr.mapPlan.replace(load, newLoad);
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
Thu Sep 3 00:36:45 2009
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Currently skipInterval is similar to the randomsampleloader. However, if we
were to use an
+ * uniform distribution, we could precompute the intervals and read it from a
file.
+ *
+ */
+public class PoissonSampleLoader extends SampleLoader {
+
+ // Base number of samples needed
+ private long baseNumSamples;
+
+ /// Count of the map splits
+ private static final String MAPSPLITS_COUNT = "pig.mapsplits.count";
+
+ /// Conversion factor accounts for the various encodings, compression
etc
+ private static final String CONV_FACTOR =
"pig.inputfile.conversionfactor";
+
+ /// For a given mean and a confidence, a sample rate is obtained from a
poisson cdf
+ private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
+
+ /// % of memory available for the input data. This is currenty equal to
the memory available
+ /// for the skewed join
+ private static final String PERC_MEM_AVAIL =
"pig.skewedjoin.reduce.memusage";
+
+ // 17 is not a magic number. It can be obtained by using a poisson
cumulative distribution function with the mean
+ // set to 10 (emperically, minimum number of samples) and the
confidence set to 95%
+ private static final int DEFAULT_SAMPLE_RATE = 17;
+
+ // By default the data is multiplied by 2 to account for the encoding
+ private static final int DEFAULT_CONV_FACTOR = 2;
+
+
+ public PoissonSampleLoader(String funcSpec, String ns) {
+ super(funcSpec);
+ super.setNumSamples(Integer.valueOf(ns)); // will be overridden
+ }
+
+ // n is the number of map tasks
+ @Override
+ public void setNumSamples(int n) {
+ super.setNumSamples(n); // will be overridden
+ }
+
+ /**
+ * Computes the number of samples for the loader
+ *
+ * @param inputs : Set to pig inputs
+ * @param pc : PigContext object
+ *
+ */
+ @Override
+ public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs,
PigContext pc) throws ExecException {
+ int numSplits, convFactor, sampleRate;
+ Properties pcProps = pc.getProperties();
+
+ // Set default values for the various parameters
+ try {
+ numSplits =
Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT));
+ } catch (NumberFormatException e) {
+ String msg = "Couldn't retrieve the number of maps in
the job";
+ throw new ExecException(msg);
+ }
+
+ try {
+ convFactor =
Integer.valueOf(pcProps.getProperty(CONV_FACTOR));
+ } catch (NumberFormatException e) {
+ convFactor = DEFAULT_CONV_FACTOR;
+ }
+
+ try {
+ sampleRate =
Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
+ } catch (NumberFormatException e) {
+ sampleRate = DEFAULT_SAMPLE_RATE;
+ }
+
+ // % of memory available for the records
+ float heapPerc =
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+
+ // we are only concerned with the first input for skewed join
+ String fname = inputs.get(0).first.getFileName();
+
+ // calculate the base number of samples
+ try {
+ float f = (Runtime.getRuntime().maxMemory() * heapPerc)
/ (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
+ baseNumSamples = (long) Math.ceil(1.0 / f);
+ } catch (IOException e) {
+ int errCode = 2175;
+ String msg = "Internal error. Could not retrieve file
size for the sampler.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ } catch (ArithmeticException e) {
+ int errCode = 1105;
+ String msg = "Heap percentage / Conversion factor
cannot be set to 0";
+ throw new ExecException(msg,errCode,PigException.INPUT);
+ }
+
+ // set the number of samples
+ int n = (int) ((baseNumSamples * sampleRate) / numSplits);
+
+ // set the minimum number of samples to 1
+ n = (n > 1) ? n : 1;
+ setNumSamples(n);
+ }
+
+
+}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
Thu Sep 3 00:36:45 2009
@@ -17,33 +17,13 @@
*/
package org.apache.pig.impl.builtin;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
/**
* A loader that samples the data. This loader can subsume loader that
* can handle starting in the middle of a record. Loaders that can
* handle this should implement the SamplableLoader interface.
*/
-public class RandomSampleLoader implements LoadFunc {
-
- private int numSamples;
- private long skipInterval;
- private TupleFactory factory;
- protected SamplableLoader loader;
-
+public class RandomSampleLoader extends SampleLoader {
+
/**
* Construct with a class of loader to use.
* @param funcSpec func spec of the loader to use.
@@ -53,107 +33,17 @@
* UDF constructors.
*/
public RandomSampleLoader(String funcSpec, String ns) {
- loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
- numSamples = Integer.valueOf(ns);
+ // instantiate the loader
+ super(funcSpec);
+ // set the number of samples
+ super.setNumSamples(Integer.valueOf(ns));
}
-
- public void bindTo(String fileName, BufferedPositionedInputStream is, long
offset, long end) throws IOException {
- skipInterval = (end - offset)/numSamples;
- loader.bindTo(fileName, is, offset, end);
- }
-
- public Tuple getNext() throws IOException {
- long initialPos = loader.getPosition();
-
- // make sure we move to a boundry of a record
- Tuple t = loader.getSampledTuple();
- long middlePos = loader.getPosition();
-
- // we move to next boundry
- t = loader.getSampledTuple();
- long finalPos = loader.getPosition();
-
- long toSkip = skipInterval - (finalPos - initialPos);
- if (toSkip > 0) {
- long rc = loader.skip(toSkip);
-
- // if we did not skip enough
- // in the first attempt, call
- // in.skip() repeatedly till we
- // skip enough
- long remainingSkip = toSkip - rc;
- while(remainingSkip > 0) {
- rc = loader.skip(remainingSkip);
- if(rc == 0) {
- // underlying stream saw EOF
- break;
- }
- remainingSkip -= rc;
- }
- }
-
- if (t == null) {
- return null;
- }
-
- if (factory == null) {
- factory = TupleFactory.getInstance();
- }
-
- // copy existing field
- Tuple m = factory.newTuple(t.size()+1);
- for(int i=0; i<t.size(); i++) {
- m.set(i, t.get(i));
- }
-
- // add size of the tuple at the end
- m.set(t.size(), (finalPos-middlePos));
-
- return m;
- }
-
- public Integer bytesToInteger(byte[] b) throws IOException {
- return loader.bytesToInteger(b);
- }
-
- public Long bytesToLong(byte[] b) throws IOException {
- return loader.bytesToLong(b);
- }
-
- public Float bytesToFloat(byte[] b) throws IOException {
- return loader.bytesToFloat(b);
- }
-
- public Double bytesToDouble(byte[] b) throws IOException {
- return loader.bytesToDouble(b);
- }
-
- public String bytesToCharArray(byte[] b) throws IOException {
- return loader.bytesToCharArray(b);
- }
-
- public Map<String, Object> bytesToMap(byte[] b) throws IOException {
- return loader.bytesToMap(b);
- }
-
- public Tuple bytesToTuple(byte[] b) throws IOException {
- return loader.bytesToTuple(b);
- }
-
- public DataBag bytesToBag(byte[] b) throws IOException {
- return loader.bytesToBag(b);
- }
-
- public void fieldsToRead(Schema schema) {
- loader.fieldsToRead(schema);
- }
-
- public Schema determineSchema(
- String fileName,
- ExecType execType,
- DataStorage storage) throws IOException {
- return loader.determineSchema(fileName, execType, storage);
+ @Override
+ public void setNumSamples(int n) {
+ // Setting it to 100 as default for order by
+ super.setNumSamples(100);
}
+
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java Thu Sep
3 00:36:45 2009
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Abstract class that specifies the interface for sample loaders
+ *
+ */
+public abstract class SampleLoader implements LoadFunc {
+
+ protected int numSamples;
+ protected long skipInterval;
+ protected SamplableLoader loader;
+ private TupleFactory factory;
+
+
+ public SampleLoader(String funcSpec) {
+ loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+ }
+
+ public void setNumSamples(int n) {
+ numSamples = n;
+ }
+
+ public int getNumSamples() {
+ return numSamples;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bindTo(java.lang.String,
org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
+ */
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ skipInterval = (end - offset)/numSamples;
+ loader.bindTo(fileName, is, offset, end);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
+ */
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ return loader.bytesToBag(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
+ */
+ public String bytesToCharArray(byte[] b) throws IOException {
+ return loader.bytesToCharArray(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
+ */
+ public Double bytesToDouble(byte[] b) throws IOException {
+ return loader.bytesToDouble(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
+ */
+ public Float bytesToFloat(byte[] b) throws IOException {
+ return loader.bytesToFloat(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
+ */
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ return loader.bytesToInteger(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
+ */
+ public Long bytesToLong(byte[] b) throws IOException {
+ return loader.bytesToLong(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
+ */
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ return loader.bytesToMap(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
+ */
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ return loader.bytesToTuple(b);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String,
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ return loader.determineSchema(fileName, execType, storage);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+ */
+ public void fieldsToRead(Schema schema) {
+ loader.fieldsToRead(schema);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+ public Tuple getNext() throws IOException {
+ long initialPos = loader.getPosition();
+
+ // make sure we move to a boundry of a record
+ Tuple t = loader.getSampledTuple();
+ long middlePos = loader.getPosition();
+
+ // we move to next boundry
+ t = loader.getSampledTuple();
+ long finalPos = loader.getPosition();
+
+ long toSkip = skipInterval - (finalPos - initialPos);
+ if (toSkip > 0) {
+ long rc = loader.skip(toSkip);
+
+ // if we did not skip enough
+ // in the first attempt, call
+ // in.skip() repeatedly till we
+ // skip enough
+ long remainingSkip = toSkip - rc;
+ while(remainingSkip > 0) {
+ rc = loader.skip(remainingSkip);
+ if(rc == 0) {
+ // underlying stream saw EOF
+ break;
+ }
+ remainingSkip -= rc;
+ }
+ }
+
+ if (t == null) {
+ return null;
+ }
+
+ if (factory == null) {
+ factory = TupleFactory.getInstance();
+ }
+
+ // copy existing field
+ Tuple m = factory.newTuple(t.size()+1);
+ for(int i=0; i<t.size(); i++) {
+ m.set(i, t.get(i));
+ }
+
+ // add size of the tuple at the end
+ m.set(t.size(), (finalPos-middlePos));
+
+ return m;
+ }
+
+ public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs,
PigContext pc) throws ExecException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Thu
Sep 3 00:36:45 2009
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.apache.pig.impl.io.FileSpec;
+
+
+public class TestPoissonSampleLoader extends TestCase{
+ private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
+
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestPoissonSampleLoader() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
"5");
+
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
"0.0001");
+
pigServer.getPigContext().getProperties().setProperty("mapred.child.java.opts",
"-Xmx512m");
+
+
pigServer.getPigContext().getProperties().setProperty("pig.mapsplits.count",
"5");
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
+ createFiles();
+ }
+
+ private void createFiles() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+
+ int k = 0;
+ for(int j=0; j<100; j++) {
+ w.println("100\tapple1\taaa" + k);
+ k++;
+ w.println("200\torange1\tbbb" + k);
+ k++;
+ w.println("300\tstrawberry\tccc" + k);
+ k++;
+ }
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE1).delete();
+
+ Util.deleteFile(cluster, INPUT_FILE1);
+ }
+
+
+ public void testComputeSamples() throws IOException{
+ FileSpec fs = new FileSpec(INPUT_FILE1, new
FuncSpec(PigStorage.class.getName()));
+
+ ArrayList<Pair<FileSpec, Boolean>> inputs = new
ArrayList<Pair<FileSpec, Boolean> >();
+ inputs.add(new Pair<FileSpec, Boolean>(fs, true));
+
+ // Use 100 as a default value;
+ PoissonSampleLoader ps = new PoissonSampleLoader((new
FuncSpec(PigStorage.class.getName())).toString(), "100");
+
+ // Get the number of samples for the file
+ ps.computeSamples(inputs, pigServer.getPigContext());
+
+ if (ps.getNumSamples() != 3) {
+ fail("Compute samples returned the wrong number of samples");
+ }
+ }
+
+}
\ No newline at end of file