Author: hashutosh Date: Wed Apr 14 23:50:54 2010 New Revision: 934245 URL: http://svn.apache.org/viewvc?rev=934245&view=rev Log: PIG-1363: Unnecessary loadFunc instantiations
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=934245&r1=934244&r2=934245&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Apr 14 23:50:54 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1363: Unnecessary loadFunc instantiations (hashutosh) + PIG-1370: Marking Pig interface for org.apache.pig package (gates) PIG-1354: UDFs for dynamic invocation of simple Java methods (dvryaboy) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=934245&r1=934244&r2=934245&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Apr 14 23:50:54 2010 @@ -925,13 +925,12 @@ public class MRCompiler extends PhyPlanV throw new MRCompilerException(errMsg,errCode,PigException.BUG); } - POLoad loader = (POLoad)phyOp; - Object loadFunc = PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec()); + + LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc(); try { if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){ throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc."); } - ((LoadFunc)loadFunc).setUDFContextSignature(loader.getSignature()); ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit(); } catch (MRCompilerException e){ throw (e); @@ -1126,13 +1125,12 @@ public class MRCompiler extends PhyPlanV POLoad sideLoader = (POLoad)rootPOOp; FileSpec loadFileSpec = sideLoader.getLFile(); FuncSpec funcSpec = loadFileSpec.getFuncSpec(); - Object loadfunc = PigContext.instantiateFuncFromSpec(funcSpec); + LoadFunc loadfunc = sideLoader.getLoadFunc(); if(i == 0){ if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc."); - ((LoadFunc)loadfunc).setUDFContextSignature(sideLoader.getSignature()); ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit(); continue; } @@ -1200,7 +1198,7 @@ public class MRCompiler extends PhyPlanV POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0); FileSpec origLoaderFileSpec = baseLoader.getLFile(); FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec(); - Object loadFunc = PigContext.instantiateFuncFromSpec(funcSpec); + LoadFunc loadFunc = baseLoader.getLoadFunc(); if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){ int errCode = 1104; @@ -1377,9 +1375,8 @@ public class MRCompiler extends PhyPlanV // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad. POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0); - LoadFunc rightLoadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoader.getLFile().getFuncSpec()); - joinOp.setSignature(rightLoader.getSignature()); - if(rightLoadFunc instanceof IndexableLoadFunc) { + LoadFunc rightLoadFunc = rightLoader.getLoadFunc(); + if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) { joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec()); joinOp.setRightInputFileName(rightLoader.getLFile().getFileName()); @@ -1417,17 +1414,21 @@ public class MRCompiler extends PhyPlanV } else { // Replace POLoad with indexer. - String[] indexerArgs = new String[6]; - FileSpec origRightLoaderFileSpec = rightLoader.getLFile(); - indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); - if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof OrderedLoadFunc)){ + + LoadFunc loadFunc = rightLoader.getLoadFunc(); + if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){ int errCode = 1104; String errMsg = "Right input of merge-join must implement " + "OrderedLoadFunc interface. The specified loader " - + indexerArgs[0] + " doesn't implement it"; + + loadFunc + " doesn't implement it"; throw new MRCompilerException(errMsg,errCode); } + + String[] indexerArgs = new String[6]; List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1); + FileSpec origRightLoaderFileSpec = rightLoader.getLFile(); + + indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans); indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan); indexerArgs[3] = rightLoader.getSignature(); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=934245&r1=934244&r2=934245&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Apr 14 23:50:54 2010 @@ -1640,7 +1640,7 @@ public class LogToPhyTranslationVisitor public void visit(LOLoad loLoad) throws VisitorException { String scope = loLoad.getOperatorKey().scope; POLoad load = new POLoad(new OperatorKey(scope, nodeGen - .getNextNodeId(scope))); + .getNextNodeId(scope)),loLoad.getLoadFunc()); load.setAlias(loLoad.getAlias()); load.setLFile(loLoad.getInputFile()); load.setPc(pc); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=934245&r1=934244&r2=934245&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Wed Apr 14 23:50:54 2010 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; import java.io.IOException; -import java.io.InputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,8 +27,6 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.io.ReadToEndLoader; import org.apache.pig.impl.plan.OperatorKey; @@ -55,15 +52,13 @@ public class POLoad extends PhysicalOper */ private static final long serialVersionUID = 1L; // The user defined load function or a default load function - transient LoadFunc loader = null; + private transient LoadFunc loader = null; // The filespec on which the operator is based FileSpec lFile; // PigContext passed to us by the operator creator PigContext pc; //Indicates whether the loader setup is done or not boolean setUpDone = false; - // default offset. - private long offset = 0; // Alias for the POLoad private String signature; @@ -73,21 +68,20 @@ public class POLoad extends PhysicalOper this(k,-1, null); } - public POLoad(OperatorKey k, FileSpec lFile){ this(k,-1,lFile); } - public POLoad(OperatorKey k, FileSpec lFile, long offset){ - this(k,-1,lFile); - this.offset = offset; - } - public POLoad(OperatorKey k, int rp, FileSpec lFile) { super(k, rp); this.lFile = lFile; } + public POLoad(OperatorKey k, LoadFunc lf){ + this(k); + this.loader = lf; + } + /** * Set up the loader by * 1) Instantiating the load func @@ -96,15 +90,10 @@ public class POLoad extends PhysicalOper * @throws IOException */ public void setUp() throws IOException{ - String filename = lFile.getFileName(); - LoadFunc origloader = - (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()); - loader = new ReadToEndLoader(origloader, + loader = new ReadToEndLoader((LoadFunc) + PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()), ConfigurationUtil.toConfiguration(pc.getProperties()), - filename, - 0); - - + lFile.getFileName(),0); } /** @@ -207,4 +196,8 @@ public class POLoad extends PhysicalOper public void setSignature(String signature) { this.signature = signature; } + + public LoadFunc getLoadFunc(){ + return this.loader; + } }