Author: gates
Date: Tue Jun 30 01:10:29 2009
New Revision: 789523
URL: http://svn.apache.org/viewvc?rev=789523&view=rev
Log:
PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending
BinStorage. Added new Samplable interface for loaders to implement allowing
them to be used by RandomSampleLoader.
Added:
hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 30 01:10:29 2009
@@ -26,6 +26,10 @@
IMPROVEMENTS
+PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending
+ BinStorage. Added new Samplable interface for loaders to
implement
+ allowing them to be used by RandomSampleLoader (ashutoshc via
gates).
+
PIG-832: Make import list configurable (daijy)
PIG-697: Proposed improvements to pig's optimizer (sms)
Added: hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java?rev=789523&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java Tue Jun 30
01:10:29 2009
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+/**
+ * Implementing this interface indicates to Pig that a given loader can be
+ * used by a sampling loader. The requirement for this is that the loader
+ * can handle a getNext() call without knowing the position in the file.
+ * This will not be the case for loaders that handle structured data such
+ * as XML where they must start at the beginning of the file in order to
+ * understand their position. Record oriented loaders such as PigStorage
+ * can handle this by seeking to the next record delimiter and starting
+ * from that point. Another requirement is that the loader be able to
+ * skip or seek in its input stream.
+ */
+public interface SamplableLoader extends LoadFunc {
+
+ /**
+ * Skip ahead in the input stream.
+ * @param n number of bytes to skip
+ * @return number of bytes actually skipped. The return semantics are
+ * exactly the same as {...@link java.io.InpuStream#skip(long)}
+ */
+ public long skip(long n) throws IOException;
+
+ /**
+ * Get the current position in the stream.
+ * @return position in the stream.
+ */
+ public long getPosition() throws IOException;
+}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Tue Jun 30 01:10:29 2009
@@ -507,6 +507,13 @@
}
+ /**
+ * Force an end to the current map reduce job with a store into a temporary
+ * file.
+ * @param fSpec Temp file to force a store into.
+ * @return MR operator that now is finished with a store.
+ * @throws PlanException
+ */
private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws
PlanException{
if(compiledInputs.length>1) {
int errCode = 2023;
@@ -1231,8 +1238,20 @@
return mro;
}
- public Pair<MapReduceOper,Integer> getQuantileJob(POSort inpSort,
MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp,
Pair<Integer,Byte>[] fields) throws PlanException, VisitorException {
- FileSpec quantLdFilName = new FileSpec(lFile.getFileName(), new
FuncSpec(RandomSampleLoader.class.getName()));
+ public Pair<MapReduceOper,Integer> getQuantileJob(
+ POSort inpSort,
+ MapReduceOper prevJob,
+ FileSpec lFile,
+ FileSpec quantFile,
+ int rp,
+ Pair<Integer,Byte>[] fields) throws PlanException,
VisitorException {
+ String[] rslargs = new String[2];
+ // RandomSampleLoader expects string version of FuncSpec
+ // as its first constructor argument.
+ rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
+ rslargs[1] = "100"; // TODO Needs to be determined based on file size.
+ FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
+ new FuncSpec(RandomSampleLoader.class.getName(), rslargs));
MapReduceOper mro = startNew(quantLdFilName, prevJob);
POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
.getRequestedParallelism(), null, inpSort.getSortPlans(),
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Tue Jun 30 01:10:29 2009
@@ -315,6 +315,11 @@
co.getMessageCollector().logMessages(MessageType.Warning,
aggregateWarning, log);
}
+ // Optimize the jobs that have a load/store only first MR job followed
+ // by a sample job.
+ SampleOptimizer so = new SampleOptimizer(plan);
+ so.visit();
+
// optimize key - value handling in package
POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
pkgAnnotator.visit();
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=789523&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
Tue Jun 30 01:10:29 2009
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that have a sample job that immediately follows
a
+ * load/store only MR job. These kinds of plans are generated for order bys,
and
+ * will soon be generated for joins that need to sample their data first.
These
+ * can be changed so that the RandomSampleLoader subsumes the loader used in
the
+ * first job and then removes the first job.
+ */
+public class SampleOptimizer extends MROpPlanVisitor {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ public SampleOptimizer(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ private List<MapReduceOper> opsToRemove = new ArrayList<MapReduceOper>();
+
+ @Override
+ public void visit() throws VisitorException {
+
+ super.visit();
+ // remove operators marked for removal
+ for (MapReduceOper op : opsToRemove)
+ this.mPlan.remove(op);
+ }
+
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // See if this is a sampling job.
+ List<PhysicalOperator> pos = mr.mapPlan.getRoots();
+ if (pos == null || pos.size() == 0) {
+ log.debug("Map of operator empty");
+ return;
+ }
+ PhysicalOperator po = pos.get(0);
+ if (!(po instanceof POLoad)) {
+ log.debug("Root operator of map is not load.");
+ return; // Huh?
+ }
+ POLoad load = (POLoad)po;
+ String loadFunc = load.getLFile().getFuncName();
+ String loadFile = load.getLFile().getFileName();
+ if
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc))) {
+ log.debug("Not a sampling job.");
+ return;
+ }
+ if (loadFile == null) {
+ log.debug("No load file");
+ return;
+ }
+
+ // Get this job's predecessor. There should be exactly one.;
+ List<MapReduceOper> preds = mPlan.getPredecessors(mr);
+ if (preds.size() != 1) {
+ log.debug("Too many predecessors to sampling job.");
+ return;
+ }
+ MapReduceOper pred = preds.get(0);
+
+ // The predecessor should be a root.
+ List<MapReduceOper> predPreds = mPlan.getPredecessors(pred);
+ if (predPreds != null && predPreds.size() > 0) {
+ log.debug("Predecessor should be a root of the plan");
+ return;
+ }
+
+ // The predecessor should have just a load and store in the map, and
nothing
+ // in the combine or reduce.
+ if ( !(pred.reducePlan.isEmpty() && pred.combinePlan.isEmpty())) {
+ log.debug("Predecessor has a combine or reduce plan");
+ return;
+ }
+
+ if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
+ log.debug("Predecessor has more than just load+store in the map");
+ return;
+ }
+
+ List<PhysicalOperator> loads = pred.mapPlan.getRoots();
+ if (loads.size() != 1) {
+ log.debug("Predecessor plan has more than one root.");
+ return;
+ }
+ PhysicalOperator r = loads.get(0);
+ if (!(r instanceof POLoad)) { // Huh?
+ log.debug("Predecessor's map plan root is not a load.");
+ return;
+ }
+ POLoad predLoad = (POLoad)r;
+ LoadFunc lf =
(LoadFunc)PigContext.instantiateFuncFromSpec(predLoad.getLFile().getFuncSpec());
+ if (!(lf instanceof SamplableLoader)) {
+ log.debug("Predecessor's loader does not implement
SamplableLoader");
+ return;
+ }
+
+ // The MR job should have one successor.
+ List<MapReduceOper> succs = mPlan.getSuccessors(mr);
+ if (succs.size() != 1) {
+ log.debug("Job has more than one successor.");
+ return;
+ }
+ MapReduceOper succ = succs.get(0);
+
+ // Find the load the correlates with the file the sampler is loading,
and
+ // check that it is using BinaryStorage.
+ if (succ.mapPlan == null) { // Huh?
+ log.debug("Successor has no map plan.");
+ return;
+ }
+ loads = succ.mapPlan.getRoots();
+ POLoad succLoad = null;
+ for (PhysicalOperator root : loads) {
+ if (!(root instanceof POLoad)) { // Huh?
+ log.debug("Successor's roots are not loads");
+ return;
+ }
+ POLoad sl = (POLoad)root;
+ if (loadFile.equals(sl.getLFile().getFileName()) &&
+
"org.apache.pig.builtin.BinStorage".equals(sl.getLFile().getFuncName())) {
+ succLoad = sl;
+ break;
+ }
+ }
+
+ if (succLoad == null) {
+ log.debug("Could not find load that matched file we are
sampling.");
+ return;
+ }
+
+ // Okay, we're on.
+ // First, replace our RandomSampleLoader with a RandomSampleLoader
that uses
+ // the load function from our predecessor.
+ String[] rslargs = new String[2];
+ FileSpec predFs = predLoad.getLFile();
+ // First argument is FuncSpec of loader function to subsume, this we
want to set for
+ // ourselves.
+ rslargs[0] = predFs.getFuncSpec().toString();
+ // Second argument is the number of samples per block, read this from
the original.
+ rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
+ FileSpec fs = new FileSpec(predFs.getFileName(),new
FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+ POLoad newLoad = new
POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs,
load.isSplittable());
+ try {
+ mr.mapPlan.replace(load, newLoad);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ // Second, replace the loader in our successor with whatever the
originally used loader was.
+ fs = new FileSpec(predFs.getFileName(), predFs.getFuncSpec());
+ newLoad = new POLoad(succLoad.getOperatorKey(),
succLoad.getRequestedParallelism(), fs, succLoad.isSplittable());
+ try {
+ succ.mapPlan.replace(succLoad, newLoad);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ // Cannot delete the pred right now, because we are still traversing
the graph. So, mark the pred and remove it from the
+ // plan once the visit by this optimizer is complete.
+ opsToRemove.add(pred);
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Jun 30
01:10:29 2009
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.URL;
import java.util.Iterator;
import java.util.Map;
@@ -35,8 +34,8 @@
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.SamplableLoader;
import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataReaderWriter;
@@ -44,14 +43,10 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.WrappedIOException;
-
-public class BinStorage implements ReversibleLoadStoreFunc {
+public class BinStorage implements ReversibleLoadStoreFunc, SamplableLoader {
public static final byte RECORD_1 = 0x01;
public static final byte RECORD_2 = 0x02;
public static final byte RECORD_3 = 0x03;
@@ -68,6 +63,16 @@
public BinStorage() {
}
+ @Override
+ public long getPosition() throws IOException {
+ return in.getPosition();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return in.skip(n);
+ }
+
public Tuple getNext() throws IOException {
byte b = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Jun 30
01:10:29 2009
@@ -20,9 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Map;
import java.util.Iterator;
@@ -30,9 +28,8 @@
import org.apache.commons.logging.Log;
import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.SamplableLoader;
import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
@@ -40,7 +37,6 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -50,7 +46,7 @@
* http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for
more information.
*/
public class PigStorage extends Utf8StorageConverter
- implements ReversibleLoadStoreFunc {
+ implements ReversibleLoadStoreFunc, SamplableLoader {
protected BufferedPositionedInputStream in = null;
protected final Log mLog = LogFactory.getLog(getClass());
@@ -101,6 +97,16 @@
}
}
+ @Override
+ public long getPosition() throws IOException {
+ return in.getPosition();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return in.skip(n);
+ }
+
public Tuple getNext() throws IOException {
if (in == null || in.getPosition() > end) {
return null;
@@ -336,5 +342,4 @@
return null;
}
-
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java Tue Jun 30 01:10:29 2009
@@ -36,6 +36,10 @@
*
* Fields are numbered from 0.
*/
+
+// Put in to make the compiler not complain about WritableComparable
+// being a generic type.
+...@suppresswarnings("unchecked")
public interface Tuple extends WritableComparable, Serializable {
/**
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
Tue Jun 30 01:10:29 2009
@@ -18,37 +18,58 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.util.Map;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
-public class RandomSampleLoader extends BinStorage {
+/**
+ * A loader that samples the data. This loader can subsume loader that
+ * can handle starting in the middle of a record. Loaders that can
+ * handle this should implement the SamplableLoader interface.
+ */
+public class RandomSampleLoader implements LoadFunc {
- public static int defaultNumSamples = 100;
- int numSamples = defaultNumSamples;
+ private int numSamples;
private long skipInterval;
+ private SamplableLoader loader;
+
+ /**
+ * Construct with a class of loader to use.
+ * @param funcSpec func spec of the loader to use.
+ * @param ns Number of samples per map to collect.
+ * Arguments are passed as strings instead of actual types (FuncSpec, int)
+ * because FuncSpec only supports string arguments to
+ * UDF constructors.
+ */
+ public RandomSampleLoader(String funcSpec, String ns) {
+ loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+ numSamples = Integer.valueOf(ns);
+ }
@Override
public void bindTo(String fileName, BufferedPositionedInputStream is, long
offset, long end) throws IOException {
skipInterval = (end - offset)/numSamples;
- super.bindTo(fileName, is, offset, end);
+ loader.bindTo(fileName, is, offset, end);
}
@Override
public Tuple getNext() throws IOException {
- long initialPos = in.getPosition();
- Tuple t = super.getNext();
- long finalPos = in.getPosition();
+ long initialPos = loader.getPosition();
+ Tuple t = loader.getNext();
+ long finalPos = loader.getPosition();
long toSkip = skipInterval - (finalPos - initialPos);
if (toSkip > 0) {
- long rc = in.skip(toSkip);
+ long rc = loader.skip(toSkip);
// if we did not skip enough
// in the first attempt, call
@@ -56,7 +77,7 @@
// skip enough
long remainingSkip = toSkip - rc;
while(remainingSkip > 0) {
- rc = in.skip(remainingSkip);
+ rc = loader.skip(remainingSkip);
if(rc == 0) {
// underlying stream saw EOF
break;
@@ -67,13 +88,46 @@
return t;
}
- @Override
- public void bindTo(OutputStream os) throws IOException {
- int errCode = 2101;
- String msg = this.getClass().getName() + " should not be used for
storing.";
- throw new ExecException(msg, errCode, PigException.BUG);
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ return loader.bytesToInteger(b);
}
-
-
-
-}
+
+ public Long bytesToLong(byte[] b) throws IOException {
+ return loader.bytesToLong(b);
+ }
+
+ public Float bytesToFloat(byte[] b) throws IOException {
+ return loader.bytesToFloat(b);
+ }
+
+ public Double bytesToDouble(byte[] b) throws IOException {
+ return loader.bytesToDouble(b);
+ }
+
+ public String bytesToCharArray(byte[] b) throws IOException {
+ return loader.bytesToCharArray(b);
+ }
+
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ return loader.bytesToMap(b);
+ }
+
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ return loader.bytesToTuple(b);
+ }
+
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ return loader.bytesToBag(b);
+ }
+
+ public void fieldsToRead(Schema schema) {
+ loader.fieldsToRead(schema);
+ }
+
+ public Schema determineSchema(
+ String fileName,
+ ExecType execType,
+ DataStorage storage) throws IOException {
+ return loader.determineSchema(fileName, execType, storage);
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=789523&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Tue Jun
30 01:10:29 2009
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SampleOptimizer;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Test;
+
+public class TestSampleOptimizer {
+
+ static PigContext pc;
+ static{
+ pc = new
PigContext(ExecType.MAPREDUCE,MiniCluster.buildCluster().getProperties());
+ try {
+ pc.connect();
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testOptimizerFired() throws Exception{
+
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+ planTester.buildPlan(" B = order A by $0;");
+ LogicalPlan lp = planTester.buildPlan("store B into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ int count = 1;
+ MapReduceOper mrOper = mrPlan.getRoots().get(0);
+ while(mrPlan.getSuccessors(mrOper) != null) {
+ mrOper = mrPlan.getSuccessors(mrOper).get(0);
+ ++count;
+ }
+
+ // Before optimizer visits, number of MR jobs = 3.
+ assertEquals(3,count);
+
+ SampleOptimizer so = new SampleOptimizer(mrPlan);
+ so.visit();
+
+ count = 1;
+ mrOper = mrPlan.getRoots().get(0);
+ while(mrPlan.getSuccessors(mrOper) != null) {
+ mrOper = mrPlan.getSuccessors(mrOper).get(0);
+ ++count;
+ }
+
+ // After optimizer visits, number of MR jobs = 2.
+ assertEquals(2,count);
+
+ // Test if RandomSampleLoader got pushed to top.
+ mrOper = mrPlan.getRoots().get(0);
+ List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots();
+ assertEquals(1, phyOps.size());
+ assertTrue(phyOps.get(0) instanceof POLoad);
+
assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+
+ // Test RandomSampleLoader is not present anymore in second MR job.
+ phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots();
+ assertEquals(1, phyOps.size());
+ assertTrue(phyOps.get(0) instanceof POLoad);
+
assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+ }
+
+ @Test
+ public void testOptimizerNotFired() throws Exception{
+
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan(" C = order B by $0;");
+ LogicalPlan lp = planTester.buildPlan("store C into 'output';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ int count = 1;
+ MapReduceOper mrOper = mrPlan.getRoots().get(0);
+ while(mrPlan.getSuccessors(mrOper) != null) {
+ mrOper = mrPlan.getSuccessors(mrOper).get(0);
+ ++count;
+ }
+ // Before optimizer visits, number of MR jobs = 3.
+ assertEquals(3,count);
+
+ SampleOptimizer so = new SampleOptimizer(mrPlan);
+ so.visit();
+
+ count = 1;
+ mrOper = mrPlan.getRoots().get(0);
+ while(mrPlan.getSuccessors(mrOper) != null) {
+ mrOper = mrPlan.getSuccessors(mrOper).get(0);
+ ++count;
+ }
+
+ // After optimizer visits, number of MR jobs = 3. Since here
+ // optimizer is not fired.
+ assertEquals(3,count);
+
+ // Test Sampler is not moved and is present in 2nd MR job.
+ mrOper = mrPlan.getRoots().get(0);
+ List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots();
+ assertEquals(1, phyOps.size());
+ assertTrue(phyOps.get(0) instanceof POLoad);
+
assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+
+ phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots();
+ assertEquals(1, phyOps.size());
+ assertTrue(phyOps.get(0) instanceof POLoad);
+
assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+ }
+
+
+ // End to end is more comprehensively tested in TestOrderBy and
TestOrderBy2. But since those tests are currently excluded
+ // this simple end to end test is included.
+ @Test
+ public void testEndToEnd() throws Exception{
+
+ PigServer pigServer = new PigServer(pc);
+ int LOOP_COUNT = 40;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random(3);
+ int rand;
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ rand = r.nextInt(100);
+ ps.println(rand);
+ }
+ ps.close();
+
+ pigServer.registerQuery("A = LOAD '" +
Util.generateURI(tmpFile.toString()) + "' using PigStorage() AS (num:int);");
+ pigServer.registerQuery("B = order A by num desc;");
+ Iterator<Tuple> result = pigServer.openIterator("B");
+
+ Integer prevNum = null;
+ while (result.hasNext())
+ {
+ Integer curNum = (Integer)result.next().get(0);
+ if (null != prevNum)
+ assertTrue(curNum.compareTo(prevNum) <= 0 );
+
+ prevNum = curNum;
+ }
+ tmpFile.delete();
+ }
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Tue
Jun 30 01:10:29 2009
@@ -78,7 +78,7 @@
| | |
| | Project[tuple][*] - -131
| |
- |
|---Load(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader)
- -129
+ |
|---Load(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.builtin.BinStorage','100'))
- -129
|
|---MapReduce(-1) - -127:
Reduce Plan Empty