Author: olga
Date: Thu May  1 19:30:37 2008
New Revision: 652735

URL: http://svn.apache.org/viewvc?rev=652735&view=rev
Log:
PIG-226: fix for streaming optimization bug

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu May  1 19:30:37 2008
@@ -257,3 +257,6 @@
     PIG-151: fixes to code that handles bzip files
 
     PIG-222: fix build break
+
+    PIG-226: fix for streaming optimization bug
+

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 Thu May  1 19:30:37 2008
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.builtin.BinaryStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
@@ -73,27 +74,31 @@
                 
                 FileSpec loadFileSpec = load.getInputFileSpec();
                 
-                // Instantiate both LoadFunc objects to compare them for 
-                // equality
-                LoadFunc streamLoader = 
-                    (LoadFunc)PigContext.instantiateFuncFromSpec(
+                // Instantiate both to compare them for equality
+                StoreFunc streamStorer = 
+                    (StoreFunc)PigContext.instantiateFuncFromSpec(
                             streamInputSpec.getSpec());
                 
                 LoadFunc inputLoader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(
                                                 loadFileSpec.getFuncSpec());
 
-                // Check if both LoadFunc objects belong to the same type
+                // Check if the streaming command's inputSpec also implements 
+                // LoadFunc and if it does, are they of the same type?
                 boolean sameType = false;
                 try {
-                    streamLoader.getClass().cast(inputLoader);
-                    sameType = true;
+                    // TODO: We should actually check if the streamStorer
+                    // is _reversible_ as the inputLoader ...
+                    if (streamStorer instanceof LoadFunc) {
+                        streamStorer.getClass().cast(inputLoader);
+                        sameType = true;
+                    }
                 } catch (ClassCastException cce) {
                     sameType = false;
                 }
                 
                 // Check if both LoadFunc objects belong to the same type and
                 // are equivalent
-                if (sameType && streamLoader.equals(inputLoader)) {
+                if (sameType && streamStorer.equals(inputLoader)) {
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     load.setInputFileSpec(new 
FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 Thu May  1 19:30:37 2008
@@ -19,6 +19,7 @@
 
 import java.util.List;
 
+import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.builtin.BinaryStorage;
 import org.apache.pig.impl.PigContext;
@@ -98,28 +99,32 @@
                 
                 FileSpec storeFileSpec = s.getOutputFileSpec();
                 
-                // Instantiate both StoreFunc objects to compare them for 
-                // equality
-                StoreFunc streamStorer = 
-                    (StoreFunc)PigContext.instantiateFuncFromSpec(
+                // Instantiate both to compare them for equality
+                LoadFunc streamLoader = 
+                    (LoadFunc)PigContext.instantiateFuncFromSpec(
                             streamOutputSpec.getSpec());
                 
                 StoreFunc outputStorer = 
(StoreFunc)PigContext.instantiateFuncFromSpec(
                                                 storeFileSpec.getFuncSpec());
                 
 
-                // Check if both LoadFunc objects belong to the same type
+                // Check if the streaming command's outputSpec also implements 
+                // StoreFunc and if it does, are they of the same type?
                 boolean sameType = false;
                 try {
-                    streamStorer.getClass().cast(outputStorer);
-                    sameType = true;
+                    // TODO: We should actually check if the streamLoader
+                    // is _reversible_ as the outputStorer ...
+                    if (streamLoader instanceof StoreFunc) {
+                        streamLoader.getClass().cast(outputStorer);
+                        sameType = true;
+                    }
                 } catch (ClassCastException cce) {
                     sameType = false;
                 }
                 
                 // Check if both LoadFunc objects belong to the same type and
                 // are equivalent
-                if (sameType && streamStorer.equals(outputStorer)) {
+                if (sameType && streamLoader.equals(outputStorer)) {
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     s.setOutputFileSpec(new 
FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May  1 
19:30:37 2008
@@ -470,4 +470,42 @@
         Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
     }
 
+    @Test
+    public void testLocalNegativeLoadStoreOptimization() throws Exception {
+        testNegativeLoadStoreOptimization(ExecType.LOCAL);
+    }
+    
+    @Test
+    public void testMRNegativeLoadStoreOptimization() throws Exception {
+        testNegativeLoadStoreOptimization(ExecType.MAPREDUCE);
+    }
+    
+    private void testNegativeLoadStoreOptimization(ExecType execType) 
+    throws Exception {
+            PigServer pigServer = createPigServer(execType);
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
"D,2",
+                                                        "A,5", "B,5", "C,8", 
"A,8",
+                                                        "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", 
"A"};
+        int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+            setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + 
+                                "` input(stdin using PigDump());");
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                PigStorage.class.getName() + "(',') " +
+                                "split by 'file';");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+        
+        // Run the query and check the results
+        Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+    }
+
 }


Reply via email to