Author: gates Date: Thu Apr 17 08:47:08 2008 New Revision: 649154 URL: http://svn.apache.org/viewvc?rev=649154&view=rev Log: PIG-114: store one alias/logicalPlan twice leads to instantiation of StoreFunc as LoadFunc.
Added: incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=649154&r1=649153&r2=649154&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu Apr 17 08:47:08 2008 @@ -237,3 +237,6 @@ PIG-183: Catch when a UDF has been compiled with the wrong version of java and give a RuntimeException (pi_song via gates). + + PIG-114: store one alias/logicalPlan twice leads to instantiation of + StoreFunc as LoadFunc (pi_song via gates). Added: incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java?rev=649154&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java (added) +++ incubator/pig/trunk/src/org/apache/pig/ReversibleLoadStoreFunc.java Thu Apr 17 08:47:08 2008 @@ -0,0 +1,14 @@ +package org.apache.pig; + +/** + * This interface is used to implement classes that can perform both + * Load and Store functionalities in a symmetric fashion (thus reversible). + * + * The symmetry property of implementations is used in the optimization + * engine therefore violation of this property while implementing this + * interface is likely to result in unexpected output from executions. + * + */ +public interface ReversibleLoadStoreFunc extends LoadFunc, StoreFunc { + +} Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=649154&r1=649153&r2=649154&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu Apr 17 08:47:08 2008 @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Iterator; +import org.apache.pig.ReversibleLoadStoreFunc; import org.apache.pig.builtin.BinStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.FunctionInstantiator; @@ -83,16 +84,18 @@ MapRedResult materializedResult = materializedResults.get(logicalKey); if (materializedResult != null) { - POMapreduce pom = new POMapreduce(logicalKey.getScope(), - nodeIdGenerator.getNextNodeId(logicalKey.getScope()), - execEngine.getPhysicalOpTable(), - logicalKey, - pigContext); + if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) + instanceof ReversibleLoadStoreFunc) { + POMapreduce pom = new POMapreduce(logicalKey.getScope(), + nodeIdGenerator.getNextNodeId(logicalKey.getScope()), + execEngine.getPhysicalOpTable(), logicalKey, + pigContext); - pom.addInputFile(materializedResult.outFileSpec); - pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest); + pom.addInputFile(materializedResult.outFileSpec); + pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest); - return pom.getOperatorKey(); + return pom.getOperatorKey(); + } } // first, compile inputs into MapReduce operators Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=649154&r1=649153&r2=649154&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Apr 17 08:47:08 2008 @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.HashSet; +import org.apache.pig.ReversibleLoadStoreFunc; import org.apache.pig.impl.PigContext; import org.apache.pig.data.DataBag; import org.apache.pig.data.BagFactory; @@ -161,16 +162,20 @@ LocalResult materializedResult = materializedResults.get(logicalKey); if (materializedResult != null) { - ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(), - nodeIdGenerator.getNextNodeId(logicalKey.getScope()), - physicalOpTable, - pigContext, - materializedResult.outFileSpec, - LogicalOperator.FIXED); - OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId()); - - return ppKey; + if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) + instanceof ReversibleLoadStoreFunc) { + ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(), + nodeIdGenerator.getNextNodeId(logicalKey.getScope()), + physicalOpTable, + pigContext, + materializedResult.outFileSpec, + LogicalOperator.FIXED); + + OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId()); + return ppKey; + } + } OperatorKey physicalKey = new OperatorKey(); Modified: incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=649154&r1=649153&r2=649154&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original) +++ incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Thu Apr 17 08:47:08 2008 @@ -24,13 +24,11 @@ import java.io.OutputStream; import java.util.Iterator; -import org.apache.pig.LoadFunc; -import org.apache.pig.StoreFunc; +import org.apache.pig.ReversibleLoadStoreFunc; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BufferedPositionedInputStream; - -public class BinStorage implements LoadFunc, StoreFunc { +public class BinStorage implements ReversibleLoadStoreFunc { Iterator<Tuple> i = null; protected BufferedPositionedInputStream in = null; private DataInputStream inData = null; Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=649154&r1=649153&r2=649154&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original) +++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Apr 17 08:47:08 2008 @@ -21,8 +21,7 @@ import java.io.OutputStream; import java.nio.charset.Charset; -import org.apache.pig.LoadFunc; -import org.apache.pig.StoreFunc; +import org.apache.pig.ReversibleLoadStoreFunc; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BufferedPositionedInputStream; @@ -32,7 +31,7 @@ * delimiter is given as a regular expression. See String.split(delimiter) and * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information. */ -public class PigStorage implements LoadFunc, StoreFunc { +public class PigStorage implements ReversibleLoadStoreFunc { protected BufferedPositionedInputStream in = null; long end = Long.MAX_VALUE; private byte recordDel = (byte)'\n'; Added: incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java?rev=649154&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/TestReversibleLoadStore.java Thu Apr 17 08:47:08 2008 @@ -0,0 +1,241 @@ +package org.apache.pig.test; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import junit.framework.TestCase; + +import org.apache.pig.LoadFunc; +import org.apache.pig.PigServer; +import org.apache.pig.StoreFunc; +import org.apache.pig.ReversibleLoadStoreFunc; +import org.apache.pig.PigServer.ExecType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; + +public class TestReversibleLoadStore extends TestCase { + + static List<Tuple> _storedTuples = new ArrayList<Tuple>(); + + public void testLocalNoReuse() throws Exception { + runNoReuseTest(ExecType.LOCAL) ; + } + + public void testMapReduceNoReuse() throws Exception { + runNoReuseTest(ExecType.MAPREDUCE) ; + } + + public void testLocalReuse() throws Exception { + runReuseTest(ExecType.LOCAL) ; + } + + public void testMapReduceReuse() throws Exception { + runReuseTest(ExecType.MAPREDUCE) ; + } + + public void runNoReuseTest(ExecType runType) throws Exception { + + DummyLoadFunc.readCounterMap = null ; + DummyStoreFunc.writeCounter = 0 ; + + File tmpFile = createTempFile() ; + + PigServer pig = new PigServer(ExecType.LOCAL); + pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING " + + DummyLoadFunc.class.getName() + "();"); + + String file1 = "/tmp/testPigOutput" ; + if (pig.existsFile(file1)) { + pig.deleteFile(file1) ; + } + + pig.store("A", file1, DummyStoreFunc.class.getName() + "()"); + + String file2 = "/tmp/testPigOutput2" ; + if (pig.existsFile(file2)) { + pig.deleteFile(file2) ; + } + pig.store("A", file2, DummyStoreFunc.class.getName() + "()"); + + // for this test the plan will not be reused so:- + // - initial temp file has to be read 10 times + // - DummyLoadStoreFunc has to be written 10 times + + assertEquals(10, DummyLoadFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ; + assertEquals(10, DummyStoreFunc.writeCounter) ; + + pig.deleteFile(file1) ; + pig.deleteFile(file2) ; + + } + + public void runReuseTest(ExecType runType) throws Exception { + + DummyLoadStoreFunc.readCounterMap = null ; + DummyLoadStoreFunc.writeCounter = 0 ; + + File tmpFile = createTempFile() ; + + PigServer pig = new PigServer(ExecType.LOCAL); + pig.registerQuery("A = LOAD 'file:" + tmpFile.getAbsolutePath() + "' USING " + + DummyLoadStoreFunc.class.getName() + "();"); + + String file1 = "/tmp/testPigOutput" ; + if (pig.existsFile(file1)) { + pig.deleteFile(file1) ; + } + + pig.store("A", file1, DummyLoadStoreFunc.class.getName() + "()"); + + String file2 = "/tmp/testPigOutput2" ; + if (pig.existsFile(file2)) { + pig.deleteFile(file2) ; + } + pig.store("A", file2, DummyLoadStoreFunc.class.getName() + "()"); + + // for this test the plan will be reused so:- + // - initial temp file has to be read 5 times + // - the output of the first execution has to be read 5 times + // - DummyLoadStoreFunc has to be written 10 times + + assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ; + assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("/tmp/testPigOutput").intValue()) ; + assertEquals(10, DummyLoadStoreFunc.writeCounter) ; + + + pig.deleteFile(file1) ; + pig.deleteFile(file2) ; + + } + + private File createTempFile() throws Exception { + File tmpFile = File.createTempFile("test", ".txt"); + if (tmpFile.exists()) { + tmpFile.delete() ; + } + PrintWriter pw = new PrintWriter(tmpFile) ; + pw.println("1,11,111,1111") ; + pw.println("2,22,222,2222") ; + pw.println("3,33,333,3333") ; + pw.println("4,4,444,4444") ; + pw.println("5,55,555,5555") ; + pw.close() ; + tmpFile.deleteOnExit() ; + return tmpFile ; + } + + public static class DummyLoadStoreFunc implements ReversibleLoadStoreFunc { + + public static Map<String,Integer> readCounterMap = null ; + + protected BufferedPositionedInputStream in = null; + private String fileName = null ; + + public void bindTo(String inputfileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + in = is ; + fileName = inputfileName ; + } + + public Tuple getNext() throws IOException { + String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ; + if (line == null) { + return null ; + } + // else + + if (readCounterMap == null) { + readCounterMap = new HashMap<String,Integer>() ; + } + + if (readCounterMap.get(fileName) == null) { + readCounterMap.put(fileName, 1) ; + } + else { + readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ; + } + + return new Tuple(line, ","); + } + + public static int writeCounter = 0 ; + private PrintWriter pw = null ; + + public void bindTo(OutputStream os) throws IOException { + pw = new PrintWriter(os) ; + } + + public void finish() throws IOException { + pw.close() ; + } + + public void putNext(Tuple tuple) throws IOException { + writeCounter++ ; + pw.println(tuple.toDelimitedString(",")); + } + + } + + public static class DummyLoadFunc implements LoadFunc { + + public static Map<String,Integer> readCounterMap = null ; + + protected BufferedPositionedInputStream in = null; + private String fileName = null ; + + public void bindTo(String inputfileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + in = is ; + fileName = inputfileName ; + } + + public Tuple getNext() throws IOException { + String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ; + if (line == null) { + return null ; + } + // else + + if (readCounterMap == null) { + readCounterMap = new HashMap<String,Integer>() ; + } + + if (readCounterMap.get(fileName) == null) { + readCounterMap.put(fileName, 1) ; + } + else { + readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ; + } + + return new Tuple(line, ","); + } + + } + + public static class DummyStoreFunc implements StoreFunc { + + public static int writeCounter = 0 ; + private PrintWriter pw = null ; + + public void bindTo(OutputStream os) throws IOException { + pw = new PrintWriter(os) ; + } + + public void finish() throws IOException { + pw.close() ; + } + + public void putNext(Tuple tuple) throws IOException { + writeCounter++ ; + pw.println(tuple.toDelimitedString(",")); + } + } + +}