Author: rding Date: Wed Feb 24 18:13:05 2010 New Revision: 915907 URL: http://svn.apache.org/viewvc?rev=915907&view=rev Log: PIG-1079: Modify merge join to use distributed cache to maintain the index
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=915907&r1=915906&r2=915907&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Feb 24 18:13:05 2010 @@ -133,6 +133,9 @@ BUG FIXES +PIG-1079: Modify merge join to use distributed cache to maintain the index +(rding) + PIG-1241: Accumulator is turned on when a map is used with a non-accumulative UDF (yinghe vi olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=915907&r1=915906&r2=915907&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 24 18:13:05 2010 @@ -59,6 +59,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; @@ -221,26 +222,6 @@ return new Path(pathStr); } - private Path makeTmpPath() throws IOException { - Path tmpPath = null; - for (int tries = 0;;) { - try { - tmpPath = - new Path(FileLocalizer - .getTemporaryPath(null, pigContext).toString()); - FileSystem fs = tmpPath.getFileSystem(conf); - tmpPath = tmpPath.makeQualified(fs); - fs.mkdirs(tmpPath); - break; - } catch (IOException ioe) { - if (++tries==100) { - throw ioe; - } - } - } - return tmpPath; - } - /** * Compiles all jobs that have no dependencies removes them from * the plan and returns. Should be called with the same plan until @@ -520,7 +501,7 @@ // this call modifies the ReplFiles names of POFRJoin operators // within the MR plans, must be called before the plans are // serialized - setupDistributedCacheForFRJoin(mro, pigContext, conf); + setupDistributedCacheForJoin(mro, pigContext, conf); POPackage pack = null; if(mro.reducePlan.isEmpty()){ @@ -653,13 +634,13 @@ } public static class PigSecondaryKeyGroupComparator extends WritableComparator { - @SuppressWarnings("unchecked") public PigSecondaryKeyGroupComparator() { // super(TupleFactory.getInstance().tupleClass(), true); super(NullableTuple.class, true); } - @Override + @SuppressWarnings("unchecked") + @Override public int compare(WritableComparable a, WritableComparable b) { PigNullableWritable wa = (PigNullableWritable)a; @@ -947,13 +928,13 @@ } } - private void setupDistributedCacheForFRJoin(MapReduceOper mro, + private void setupDistributedCacheForJoin(MapReduceOper mro, PigContext pigContext, Configuration conf) throws IOException { - new FRJoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf) + new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf) .visit(); - new FRJoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf) + new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf) .visit(); } @@ -1056,13 +1037,13 @@ return symlink; } - private static class FRJoinDistributedCacheVisitor extends PhyPlanVisitor { + private static class JoinDistributedCacheVisitor extends PhyPlanVisitor { private PigContext pigContext = null; private Configuration conf = null; - public FRJoinDistributedCacheVisitor(PhysicalPlan plan, + public JoinDistributedCacheVisitor(PhysicalPlan plan, PigContext pigContext, Configuration conf) { super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( plan)); @@ -1110,6 +1091,29 @@ throw new VisitorException(msg, e); } } + + @Override + public void visitMergeJoin(POMergeJoin join) throws VisitorException { + + // XXX Hadoop currently doesn't support distributed cache in local mode. + // This line will be removed after the support is added + if (pigContext.getExecType() == ExecType.LOCAL) return; + + String indexFile = join.getIndexFile(); + + // merge join may not use an index file + if (indexFile == null) return; + + try { + String symlink = addSingleFileToDistributedCache(pigContext, + conf, indexFile, "indexfile_"); + join.setIndexFile(symlink); + } catch (IOException e) { + String msg = "Internal error. Distributed cache could not " + + "be set up for merge join index file"; + throw new VisitorException(msg, e); + } + } } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=915907&r1=915906&r2=915907&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 24 18:13:05 2010 @@ -1286,7 +1286,9 @@ defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope; defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName(); joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs))); - joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); + joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); + + joinOp.setIndexFile(strFile.getFileName()); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=915907&r1=915906&r2=915907&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Wed Feb 24 18:13:05 2010 @@ -40,6 +40,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; @@ -87,6 +88,8 @@ private FuncSpec rightLoaderFuncSpec; private String rightInputFileName; + + private String indexFile; // Buffer to hold accumulated left tuples. private List<Tuple> leftTuples; @@ -131,6 +134,7 @@ mTupleFactory = TupleFactory.getInstance(); leftTuples = new ArrayList<Tuple>(arrayListSize); this.createJoinPlans(inpPlans,keyTypes); + this.indexFile = null; } /** @@ -384,9 +388,15 @@ } } - @SuppressWarnings("unchecked") private void seekInRightStream(Object firstLeftKey) throws IOException{ rightLoader = (IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); + + // check if hadoop distributed cache is used + if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) { + DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader; + loader.setIndexFile(indexFile); + } + // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. Configuration conf = new Configuration(PigMapReduce.sJobConf); @@ -545,4 +555,12 @@ public void setSignature(String signature) { this.signature = signature; } + + public void setIndexFile(String indexFile) { + this.indexFile = indexFile; + } + + public String getIndexFile() { + return indexFile; + } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=915907&r1=915906&r2=915907&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Wed Feb 24 18:13:05 2010 @@ -21,10 +21,13 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.LoadCaster; @@ -37,6 +40,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; @@ -64,7 +68,7 @@ // Index is modeled as FIFO queue and LinkedList implements java Queue interface. private LinkedList<Tuple> index; private FuncSpec rightLoaderFuncSpec; - private PigContext pc; + private String scope; private Tuple dummyTuple = null; private transient TupleFactory mTupleFactory; @@ -105,14 +109,11 @@ // the join key Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys); POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec))); - try { - pc = (PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext")); - } catch (IOException e) { - int errCode = 2094; - String msg = "Unable to deserialize pig context."; - throw new ExecException(msg,errCode,e); - } - pc.connect(); + + Properties props = new Properties(); + props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); + + PigContext pc = new PigContext(ExecType.LOCAL, props); ld.setPc(pc); index = new LinkedList<Tuple>(); for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)) @@ -191,6 +192,8 @@ } private void initRightLoader(int [] splitsToBeRead) throws IOException{ + PigContext pc = (PigContext) ObjectSerializer + .deserialize(PigMapReduce.sJobConf.get("pig.pigContext")); //create ReadToEndLoader that will read the given splits in order loader = new ReadToEndLoader( (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec), @@ -255,4 +258,8 @@ // nothing to do } + public void setIndexFile(String indexFile) { + this.indexFile = indexFile; + } + }