Author: pradeepkth
Date: Fri Feb 19 23:48:11 2010
New Revision: 912038
URL: http://svn.apache.org/viewvc?rev=912038&view=rev
Log:
PIG-1218: Use distributed cache to store samples (rding via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 19 23:48:11 2010
@@ -55,6 +55,8 @@
IMPROVEMENTS
+PIG-1218: Use distributed cache to store samples (rding via pradeepkth)
+
PIG-1226: suuport for additional jar files (thejas via olgan)
PIG-1230: Streaming input in POJoinPackage should use nonspillable bag to
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Feb 19 23:48:11 2010
@@ -43,6 +43,7 @@
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ComparisonFunc;
+import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
@@ -54,6 +55,9 @@
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -75,11 +79,14 @@
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
+
/**
* This is compiler class that takes an MROperPlan and converts
* it into a JobControl object with the relevant dependency info
@@ -509,6 +516,11 @@
// currently the parent plan is really used only when POStream is
present in the plan
new PhyPlanSetter(mro.mapPlan).visit();
new PhyPlanSetter(mro.reducePlan).visit();
+
+ // this call modifies the ReplFiles names of POFRJoin operators
+ // within the MR plans, must be called before the plans are
+ // serialized
+ setupDistributedCacheForFRJoin(mro, pigContext, conf);
POPackage pack = null;
if(mro.reducePlan.isEmpty()){
@@ -582,7 +594,9 @@
// Only set the quantiles file and sort partitioner if we're a
// global sort, not for limit after sort.
if (mro.isGlobalSort()) {
- conf.set("pig.quantilesFile", mro.getQuantFile());
+ String symlink = addSingleFileToDistributedCache(
+ pigContext, conf, mro.getQuantFile(), "pigsample");
+ conf.set("pig.quantilesFile", symlink);
nwJob.setPartitionerClass(WeightedRangePartitioner.class);
}
@@ -613,32 +627,15 @@
}
if (mro.isSkewedJoin()) {
- conf.set("pig.keyDistFile", mro.getSkewedJoinPartitionFile());
+ String symlink = addSingleFileToDistributedCache(pigContext,
+ conf, mro.getSkewedJoinPartitionFile(), "pigdistkey");
+ conf.set("pig.keyDistFile", symlink);
nwJob.setPartitionerClass(SkewedPartitioner.class);
nwJob.setMapperClass(PigMapReduce.MapWithPartitionIndex.class);
nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
}
- if (mro.isFrjoin()) {
- // set up distributed cache for the replicated files
- FileSpec[] replFiles = mro.getReplFiles();
- ArrayList<String> replicatedPath = new ArrayList<String>();
- // the first input is not replicated
- for(int i=0; i < replFiles.length; i++) {
- // ignore fragmented file
- if (i != mro.getFragment()) {
- replicatedPath.add(replFiles[i].getFileName());
- }
- }
- try {
- setupDistributedCache(pigContext, conf,
replicatedPath.toArray(new String[0]) , false);
- } catch (IOException e) {
- String msg = "Internal error. Distributed cache could not
be set up for the replicated files";
- throw new IOException (msg, e);
- }
- }
-
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new
ArrayList());
@@ -950,6 +947,16 @@
}
}
+ private void setupDistributedCacheForFRJoin(MapReduceOper mro,
+ PigContext pigContext, Configuration conf) throws IOException {
+
+ new FRJoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
+ .visit();
+
+ new FRJoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
+ .visit();
+ }
+
private static void setupDistributedCache(PigContext pigContext,
Configuration conf,
Properties properties, String
key,
@@ -1022,4 +1029,86 @@
}
}
}
+
+ private static String addSingleFileToDistributedCache(
+ PigContext pigContext, Configuration conf, String filename,
+ String prefix) throws IOException {
+
+ if (!FileLocalizer.fileExists(filename, pigContext)) {
+ throw new IOException(
+ "Internal error: skew join partition file "
+ + filename + " does not exist");
+ }
+
+ String symlink = filename;
+
+ // XXX Hadoop currently doesn't support distributed cache in local
mode.
+ // This line will be removed after the support is added by Hadoop team.
+ if (pigContext.getExecType() != ExecType.LOCAL) {
+ symlink = prefix + "_"
+ + Integer.toString(System.identityHashCode(filename)) + "_"
+ + Long.toString(System.currentTimeMillis());
+ filename = filename + "#" + symlink;
+ setupDistributedCache(pigContext, conf, new String[] { filename },
+ false);
+ }
+
+ return symlink;
+ }
+
+ private static class FRJoinDistributedCacheVisitor extends PhyPlanVisitor {
+
+ private PigContext pigContext = null;
+
+ private Configuration conf = null;
+
+ public FRJoinDistributedCacheVisitor(PhysicalPlan plan,
+ PigContext pigContext, Configuration conf) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ this.pigContext = pigContext;
+ this.conf = conf;
+ }
+
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+
+ // XXX Hadoop currently doesn't support distributed cache in
local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ // set up distributed cache for the replicated files
+ FileSpec[] replFiles = join.getReplFiles();
+ ArrayList<String> replicatedPath = new ArrayList<String>();
+
+ FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+
+ // the first input is not replicated
+ for (int i = 0; i < replFiles.length; i++) {
+ // ignore fragmented file
+ String symlink = "";
+ if (i != join.getFragment()) {
+ symlink = "pigrepl_" + join.getOperatorKey().toString() +
"_"
+ +
Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
+ + "_" + Long.toString(System.currentTimeMillis())
+ + "_" + i;
+ replicatedPath.add(replFiles[i].getFileName() + "#"
+ + symlink);
+ }
+ newReplFiles[i] = new FileSpec(symlink,
+ (replFiles[i] == null ? null :
replFiles[i].getFuncSpec()));
+ }
+
+ join.setReplFiles(newReplFiles);
+
+ try {
+ setupDistributedCache(pigContext, conf, replicatedPath
+ .toArray(new String[0]), false);
+ } catch (IOException e) {
+ String msg = "Internal error. Distributed cache could not " +
+ "be set up for the replicated files";
+ throw new VisitorException(msg, e);
+ }
+ }
+ }
+
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Feb 19 23:48:11 2010
@@ -1029,9 +1029,6 @@
addUDFs(plan);
}
}
- curMROp.setFrjoin(true);
- curMROp.setFragment(op.getFragment());
- curMROp.setReplFiles(op.getReplFiles());
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
@@ -1491,7 +1488,6 @@
String msg = "Unable to set index on newly
created POLocalRearrange.";
throw new PlanException(msg, errCode,
PigException.BUG, e);
}
-
((POPartitionRearrange)lr).setPartitionFile(partitionFile.getFileName());
groups = (List<PhysicalPlan>)joinPlans.get(l.get(1));
lr.setPlans(groups);
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Fri Feb 19 23:48:11 2010
@@ -116,12 +116,6 @@
private String scope;
-
- //Fragment Replicate Join State
- boolean frjoin = false;
- FileSpec[] replFiles = null;
- int fragment = -1;
-
int requestedParallelism = -1;
// Last POLimit value in this map reduce operator, needed by LimitAdjuster
@@ -369,29 +363,6 @@
public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
this.endOfAllInputInReduce = endOfAllInputInReduce;
}
- public int getFragment() {
- return fragment;
- }
-
- public void setFragment(int fragment) {
- this.fragment = fragment;
- }
-
- public boolean isFrjoin() {
- return frjoin;
- }
-
- public void setFrjoin(boolean frjoin) {
- this.frjoin = frjoin;
- }
-
- public FileSpec[] getReplFiles() {
- return replFiles;
- }
-
- public void setReplFiles(FileSpec[] replFiles) {
- this.replFiles = replFiles;
- }
public int getRequestedParallelism() {
return requestedParallelism;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
Fri Feb 19 23:48:11 2010
@@ -107,7 +107,8 @@
try {
Integer [] redCnt = new Integer[1];
- reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt,
job, DataType.TUPLE);
+ reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+ keyDistFile, redCnt, DataType.TUPLE);
totalReducers = redCnt[0];
} catch (Exception e) {
throw new RuntimeException(e);
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Fri Feb 19 23:48:11 2010
@@ -17,7 +17,6 @@
*/
package
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,21 +29,17 @@
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
@@ -54,8 +49,11 @@
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.Pair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable,
Writable>
implements Configurable {
@@ -64,6 +62,8 @@
public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator>
weightedParts
= new HashMap<PigNullableWritable,
DiscreteProbabilitySampleGenerator>();
+ private static final Log log =
LogFactory.getLog(WeightedRangePartitioner.class);
+
Configuration job;
@SuppressWarnings("unchecked")
@@ -90,15 +90,23 @@
@Override
public void setConf(Configuration configuration) {
job = configuration;
+
+ String quantilesFile = configuration.get("pig.quantilesFile", "");
- String quantilesFile = job.get("pig.quantilesFile", "");
-
- if (quantilesFile.length() == 0)
- throw new RuntimeException(this.getClass().getSimpleName() + "
used but no quantiles found");
+ if (quantilesFile.length() == 0) {
+ throw new RuntimeException(this.getClass().getSimpleName()
+ + " used but no quantiles found");
+ }
try{
- ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(),
job,
- quantilesFile, 0);
+
+
+ // use local file system to get the quantilesFile
+ Configuration conf = new Configuration(false);
+ conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+
+ ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(),
+ conf, quantilesFile, 0);
DataBag quantilesList;
Tuple t = loader.getNext();
if(t!=null)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
Fri Feb 19 23:48:11 2010
@@ -18,14 +18,20 @@
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +43,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.NonSpillableDataBag;
@@ -48,6 +55,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
/**
* The operator models the join keys using the Local Rearrange operators which
@@ -67,7 +75,7 @@
*
*/
private static final long serialVersionUID = 1L;
- static private Log log = LogFactory.getLog(POFRJoin.class);
+ static private Log log = LogFactory.getLog(POFRJoin.class);
// The number in the input list which denotes the fragmented input
private int fragment;
// There can be n inputs each being a List<PhysicalPlan>
@@ -311,9 +319,11 @@
POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
replFile);
- PigContext pc = new PigContext(ExecType.MAPREDUCE,
- ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
- pc.connect();
+
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+
+ PigContext pc = new PigContext(ExecType.LOCAL, props);
ld.setPc(pc);
// We use LocalRearrange Operator to seperate Key and Values
// eg. ( a, b, c ) would generate a, ( a, b, c )
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
Fri Feb 19 23:48:11 2010
@@ -17,43 +17,26 @@
*/
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-import java.io.InputStream;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-
-
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.TupleFactory;
-
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
/**
@@ -68,7 +51,7 @@
*
*/
private static final long serialVersionUID = 1L;
- private String partitionFile;
+
private Integer totalReducers = -1;
// ReducerMap will store the tuple, max reducer index & min reducer index
private static Map<Object, Pair<Integer, Integer> > reducerMap = new
HashMap<Object, Pair<Integer, Integer> >();
@@ -95,26 +78,20 @@
leafOps = new ArrayList<ExpressionOperator>();
}
- /* Returns the name for the partition sampling file */
- public String getPartitionFile() {
- return partitionFile;
- }
-
- /* Set the partition sampling file */
- public void setPartitionFile(String file) {
- partitionFile = file;
- }
-
/* Loads the key distribution file obtained from the sampler */
private void loadPartitionFile() throws RuntimeException {
+ String keyDistFile = PigMapReduce.sJobConf.get("pig.keyDistFile", "");
+ if (keyDistFile.isEmpty()) {
+ throw new RuntimeException(
+ "Internal error: missing key distribution file property.");
+ }
+
try {
Integer [] redCnt = new Integer[1];
- reducerMap = MapRedUtil.loadPartitionFile(partitionFile,
- redCnt,
-
ConfigurationUtil.toConfiguration(pigContext.getProperties()),
- DataType.NULL
- );
+ reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+ keyDistFile, redCnt, DataType.NULL);
+
totalReducers = redCnt[0];
loaded = true;
} catch (Exception e) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Fri Feb 19 23:48:11 2010
@@ -23,6 +23,8 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
@@ -46,6 +48,10 @@
*/
public class MapRedUtil {
+ private static Log log = LogFactory.getLog(MapRedUtil.class);
+
+ public static final String FILE_SYSTEM_NAME = "fs.default.name";
+
/**
* This method is to be called from an
* {...@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(
@@ -84,12 +90,17 @@
* @param keyType Type of the key to be stored in the return map. It
currently treats Tuple as a special case.
*/
@SuppressWarnings("unchecked")
- public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String
keyDistFile,
- Integer[] totalReducers, Configuration
job, byte keyType) throws IOException {
+ public static <E> Map<E, Pair<Integer, Integer>>
loadPartitionFileFromLocalCache(
+ String keyDistFile, Integer[] totalReducers, byte keyType)
+ throws IOException {
+
+ Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E,
Pair<Integer, Integer>>();
- Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E,
Pair<Integer, Integer> >();
+ // use local file system to get the keyDistFile
+ Configuration conf = new Configuration(false);
+ conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), job,
+ ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), conf,
keyDistFile, 0);
DataBag partitionList;
Tuple t = loader.getNext();
@@ -141,4 +152,5 @@
udfc.addJobConf(job);
udfc.deserialize();
}
+
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Fri Feb 19
23:48:11 2010
@@ -25,8 +25,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -520,16 +522,17 @@
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
// Get the sort job
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- jcc.updateMROpPlan(new ArrayList<Job>());
- jobControl = jcc.compile(mrPlan, "Test");
- jcc.updateMROpPlan(new ArrayList<Job>());
- jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
+ Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
+ int counter = 0;
+ while (iter.hasNext()) {
+ MapReduceOper op = iter.next();
+ counter++;
+ if (op.isGlobalSort()) {
+ assertTrue(op.getRequestedParallelism()==100);
+ }
+ }
+ assertEquals(3, counter);
- assertTrue(parallel==100);
-
pc.defaultParallel = -1;
}
@@ -552,15 +555,16 @@
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
// Get the skew join job
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- jcc.updateMROpPlan(new ArrayList<Job>());
- jobControl = jcc.compile(mrPlan, "Test");
- jcc.updateMROpPlan(new ArrayList<Job>());
- jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertTrue(parallel==100);
+ Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
+ int counter = 0;
+ while (iter.hasNext()) {
+ MapReduceOper op = iter.next();
+ counter++;
+ if (op.isSkewedJoin()) {
+ assertTrue(op.getRequestedParallelism()==100);
+ }
+ }
+ assertEquals(3, counter);
pc.defaultParallel = -1;
}