Author: pradeepkth Date: Thu Mar 25 23:39:55 2010 New Revision: 927640 URL: http://svn.apache.org/viewvc?rev=927640&view=rev Log: LOLoad should cache results of LoadMetadata.getSchema() for use in subsequent calls to LOLoad.getSchema() or LOLoad.determineSchema() (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=927640&r1=927639&r2=927640&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Mar 25 23:39:55 2010 @@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1317: LOLoad should cache results of LoadMetadata.getSchema() for use in +subsequent calls to LOLoad.getSchema() or LOLoad.determineSchema() +(pradeepkth) + OPTIMIZATIONS BUG FIXES Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=927640&r1=927639&r2=927640&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu Mar 25 23:39:55 2010 @@ -60,6 +60,8 @@ public class LOLoad extends RelationalOp private static Log log = LogFactory.getLog(LOLoad.class); private Schema mDeterminedSchema = null; private RequiredFieldList requiredFieldList; + + private boolean mDeterminedSchemaCached = false; /** * @param plan @@ -146,7 +148,6 @@ public class LOLoad extends RelationalOp if(null == mDeterminedSchema) { mSchema = determineSchema(); - mDeterminedSchema = mSchema; } mIsSchemaComputed = true; } catch (IOException ioe) { @@ -162,13 +163,18 @@ public class LOLoad extends RelationalOp } private Schema determineSchema() throws IOException { - if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) { - LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc; - ResourceSchema rSchema = loadMetadata.getSchema( - mInputFileSpec.getFileName(), new Job(conf)); - return Schema.getPigSchema(rSchema); + if(!mDeterminedSchemaCached) { + if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) { + LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc; + ResourceSchema rSchema = loadMetadata.getSchema( + mInputFileSpec.getFileName(), new Job(conf)); + mDeterminedSchema = Schema.getPigSchema(rSchema); + } + // set the flag so that future calls just use mDeterminedSchema + mDeterminedSchemaCached = true; + return mDeterminedSchema; } else { - return null; + return mDeterminedSchema; } } /* (non-Javadoc) Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=927640&r1=927639&r2=927640&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Thu Mar 25 23:39:55 2010 @@ -17,15 +17,27 @@ */ package org.apache.pig.test; +import java.io.File; import java.io.FileInputStream; +import java.io.IOException; +import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ExecType; +import org.apache.pig.Expression; +import org.apache.pig.LoadMetadata; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.builtin.BinStorage; +import org.apache.pig.impl.logicalLayer.LOPrinter; import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.PlanSetter; import org.apache.pig.impl.logicalLayer.optimizer.ImplicitSplitInserter; import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer; import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer; import org.apache.pig.impl.logicalLayer.optimizer.TypeCastInserter; +import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.plan.optimizer.OptimizerException; +import org.apache.pig.impl.util.Utils; import org.apache.pig.test.utils.LogicalPlanTester; import org.junit.Test; @@ -255,6 +267,37 @@ public class TestLogicalOptimizer extend optimizePlan(plan); } + /** + * test to check that {...@link LoadMetadata#getSchema(String, Job)} is called + * only once even if the optimizer is fired and schemas and projection maps + * are rebuilt + */ + @Test + public void testLoadGetSchemaCalledOnce() throws Exception { + String checkFileName = "checkLoadGetSchemaCalledOnce.txt"; + new File(checkFileName).delete(); + try{ + + planTester.buildPlan("A = load 'myfile' using " + + DummyMetadataLoader.class.getName() + "('"+ checkFileName +"');"); + planTester.buildPlan("B = foreach A generate $0 ;"); + LogicalPlan plan = planTester.buildPlan("C = limit B 10;"); + new LOPrinter(System.err, plan).visit(); + // Set the logical plan values correctly in all the operators + PlanSetter ps = new PlanSetter(plan); + ps.visit(); + // the optimizer should run atleast one iteration + LogicalOptimizerDerivative optimizer = + new LogicalOptimizerDerivative(plan); + int numIterations = optimizer.optimize(); + assertTrue(numIterations > 0); + assertTrue(new File(checkFileName).exists()); + } finally { + new File(checkFileName).delete(); + } + + } + // a subclass of LogicalOptimizer which can return the maximum iterations // the optimizer would try the check() and transform() methods static class LogicalOptimizerDerivative extends LogicalOptimizer { @@ -266,5 +309,59 @@ public class TestLogicalOptimizer extend return mMaxIterations; } } + + /** + * A dummy loader which extends {...@link LoadMetadata} and in the + * {...@link LoadMetadata#getSchema(String, Job)} implementation checks that + * the method is only called once. + */ + public static class DummyMetadataLoader extends BinStorage implements LoadMetadata { + + String checkFileName; + + public DummyMetadataLoader() { + + } + + public DummyMetadataLoader(String checkFileName) { + this.checkFileName = checkFileName; + } + + @Override + public String[] getPartitionKeys(String location, Job job) + throws IOException { + return null; + } + + @Override + public ResourceSchema getSchema(String location, Job job) + throws IOException { + try { + + // the create() below will fail is this method gets called + // more than once + if(!new File(checkFileName).createNewFile()) { + throw new RuntimeException(checkFileName + " already exists!"); + } + return new ResourceSchema( + Utils.getSchemaFromString("a:chararray,b:int")); + } catch (ParseException e) { + throw new IOException(e); + } + } + + @Override + public ResourceStatistics getStatistics(String location, Job job) + throws IOException { + return null; + } + + @Override + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + + } + + } }