Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java?rev=893019&r1=893018&r2=893019&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java
 Mon Dec 21 22:42:57 2009
@@ -18,32 +18,50 @@
 package org.apache.pig.test;
 
 import java.io.File;
-import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.streaming.PigStreaming;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-public class TestStreaming extends PigExecTestCase {
+public class TestStreaming {
+
+    private static final MiniCluster cluster = MiniCluster.buildCluster();
+    
+    private PigServer pigServer;
+    
+    @Before
+    public void setup() throws ExecException {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @After
+    public void tearDown() {
+        pigServer = null;
+    }
     
     private TupleFactory tf = DefaultTupleFactory.getInstance();
 
        private static final String simpleEchoStreamingCommand;
-        static {
-            if 
(System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
-                simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
-            else
-                simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
-        }
+    
+       static {
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+    }
 
     private Tuple[] setupExpectedResults(Object[] firstField, Object[] 
secondField) throws ExecException {
                Assert.assertEquals(firstField.length, secondField.length);
@@ -59,8 +77,7 @@
        }
        
        @Test
-       public void testSimpleMapSideStreaming() 
-       throws Exception {
+       public void testSimpleMapSideStreaming() throws Exception {
                File input = Util.createInputFile("tmp", "", 
                                                          new String[] {"A,1", 
"B,2", "C,3", "D,2",
                                                                        "A,5", 
"B,5", "C,8", "A,8",
@@ -77,12 +94,15 @@
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
                 expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                         
Util.toDataByteArrays(expectedSecondFields));
             }
     
                // Pig query to run
-               pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                               PigStorage.class.getName() + 
"(',');");
+            pigServer.registerQuery("IP = load '"
+                    + Util.generateURI(Util.encodeEscape(input.toString()),
+                            pigServer.getPigContext()) + "' using "
+                    + PigStorage.class.getName() + "(',');");
                pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 
'3';");
             pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
                                     simpleEchoStreamingCommand + "`;");
@@ -119,11 +139,14 @@
                        setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
                    } else {
                        expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                         
Util.toDataByteArrays(expectedSecondFields));
                    }
                // Pig query to run
-               pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                       PigStorage.class.getName() + "(',');");
+            pigServer.registerQuery("IP = load '"
+                    + Util.generateURI(Util.encodeEscape(input.toString()),
+                            pigServer.getPigContext()) + "' using "
+                    + PigStorage.class.getName() + "(',');");
                pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 
'3';");
                if(withTypes[i] == true) {
                    pigServer.registerQuery("STREAMED_DATA = stream 
FILTERED_DATA through `" +
@@ -158,12 +181,15 @@
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
                 expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                         
Util.toDataByteArrays(expectedSecondFields));
             }
 
                // Pig query to run
-               pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                               PigStorage.class.getName() + 
"(',');");
+            pigServer.registerQuery("IP = load '"
+                    + Util.generateURI(Util.encodeEscape(input.toString()),
+                            pigServer.getPigContext()) + "' using "
+                    + PigStorage.class.getName() + "(',');");
                pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 
'3';");
                pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by 
$0;");
                pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach 
GROUPED_DATA " +
@@ -210,8 +236,10 @@
                        //setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
 
                // Pig query to run
-               pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                               PigStorage.class.getName() + 
"(',');");
+        pigServer.registerQuery("IP = load '"
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using "
+                + PigStorage.class.getName() + "(',');");
                pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 
'3';");
         pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
                                 simpleEchoStreamingCommand + "`;");
@@ -233,9 +261,7 @@
 
     @Test
     public void testInputShipSpecs() throws Exception {
-        // FIXME : this should be tested in all modes
-        if(execType == ExecType.LOCAL)
-            return;
+
         File input = Util.createInputFile("tmp", "", 
                                           new String[] {"A,1", "B,2", "C,3", 
                                                         "D,2", "A,5", "B,5", 
@@ -261,24 +287,27 @@
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults =
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
         
         pigServer.registerQuery(
                 "define CMD1 `" + command1.getName() + " foo` " +
                 "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
-                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
-                "output(stdout using " + PigStorage.class.getName() + "(',')) 
" +
+                "input('foo' using " + PigStreaming.class.getName() + "(',')) 
" +
+                "output(stdout using " + PigStreaming.class.getName() + 
"(',')) " +
                 "stderr();"); 
         pigServer.registerQuery(
                 "define CMD2 `" + command2.getName() + " bar` " +
                 "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
-                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
-                "output(stdout using " + PigStorage.class.getName() + "(',')) 
" +        
+                "input('bar' using " + PigStreaming.class.getName() + "(',')) 
" +
+                "output(stdout using " + PigStreaming.class.getName() + 
"(',')) " +        
                 "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("IP = load '"
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext())
+                + "' using PigStorage(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
                                        "through CMD1;");
@@ -288,13 +317,12 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());            
         }
 
         // Run the query and check the results
@@ -303,9 +331,6 @@
 
     @Test
     public void testInputShipSpecsWithUDFDefine() throws Exception {
-        // FIXME : this should be tested in all modes
-        if(execType == ExecType.LOCAL)
-            return;
         File input = Util.createInputFile("tmp", "", 
                                           new String[] {"A,1", "B,2", "C,3", 
                                                         "D,2", "A,5", "B,5", 
@@ -331,16 +356,16 @@
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults =
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
-        
-        pigServer.registerQuery(
-                "define PS " + PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("define PS " + PigStreaming.class.getName() + 
"(',');");
+
         pigServer.registerQuery(
                 "define CMD1 `" + command1.getName() + " foo` " +
                 "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
-                "input('foo' using PS ) " +
+                "input('foo' using PS )" +
                 "output(stdout using PS ) " +
                 "stderr();"); 
         pigServer.registerQuery(
@@ -349,7 +374,9 @@
                 "input('bar' using PS ) " +
                 "output(stdout using PS ) " +        
                 "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PS ;");
+        pigServer.registerQuery("IP = load '" 
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using 
PigStorage(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
                                        "through CMD1;");
@@ -359,13 +386,12 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());    
         }
 
         // Run the query and check the results
@@ -373,11 +399,7 @@
     }
 
     @Test
-    public void testInputCacheSpecs() throws Exception {
-        // Can't run this without HDFS
-        if(execType == ExecType.LOCAL)
-            return;
-        
+    public void testInputCacheSpecs() throws Exception {        
         File input = Util.createInputFile("tmp", "", 
                                           new String[] {"A,1", "B,2", "C,3", 
                                                         "D,2", "A,5", "B,5", 
@@ -408,21 +430,24 @@
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults = 
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
         pigServer.registerQuery(
                 "define CMD1 `script1.pl foo` " +
                 "cache ('" + c1 + "#script1.pl') " +
-                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "input('foo' using " + PigStreaming.class.getName() + "(',')) 
" +
                 "stderr();"); 
         pigServer.registerQuery(
                 "define CMD2 `script2.pl bar` " +
                 "cache ('" + c2 + "#script2.pl') " +
-                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+                "input('bar' using " + PigStreaming.class.getName() + "(',')) 
" +
                 "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("IP = load '"
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using "
+                + PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
                                 "through CMD1;");
@@ -432,24 +457,20 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());            
         }
-
+        
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
 
     @Test
        public void testOutputShipSpecs() throws Exception {
-        // FIXME : this should be tested in all modes
-        if(execType == ExecType.LOCAL)
-            return;
            File input = Util.createInputFile("tmp", "", 
                                              new String[] {"A,1", "B,2", 
"C,3", 
                                                            "D,2", "A,5", 
"B,5", 
@@ -475,17 +496,20 @@
             new String[] {"A", "A", "A", "A", "A", "A"};
         Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 
10};
         Tuple[] expectedResults = 
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
         pigServer.registerQuery(
                 "define CMD `" + command.getName() + " foo bar` " +
                 "ship ('" + Util.encodeEscape(command.toString()) + "') " +
-                       "output('foo' using " + PigStorage.class.getName() + 
"(','), " +
-                       "'bar' using " + PigStorage.class.getName() + "(',')) " 
+
+                       "output('foo' using " + PigStreaming.class.getName() + 
"(','), " +
+                       "'bar' using " + PigStreaming.class.getName() + "(',')) 
" +
                        "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("IP = load '" 
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using " 
+                + PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");     
           
         
@@ -493,25 +517,20 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output+"/bar", 
-                                            pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "/bar" + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());         
         }
-
+        
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
 
     @Test
        public void testOutputShipSpecsWithUDFDefine() throws Exception {
-        // FIXME : this should be tested in all modes
-        if(execType == ExecType.LOCAL)
-            return;
            File input = Util.createInputFile("tmp", "", 
                                              new String[] {"A,1", "B,2", 
"C,3", 
                                                            "D,2", "A,5", 
"B,5", 
@@ -537,18 +556,22 @@
             new String[] {"A", "A", "A", "A", "A", "A"};
         Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 
10};
         Tuple[] expectedResults = 
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
         pigServer.registerQuery(
-                "define PS " + PigStorage.class.getName() + "(',');");
+                "define PS " + PigStreaming.class.getName() + "(',');");
+
         pigServer.registerQuery(
                 "define CMD `" + command.getName() + " foo bar` " +
                 "ship ('" + Util.encodeEscape(command.toString()) + "') " +
                        "output('foo' using PS, " +
                        "'bar' using PS) " +
                        "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PS;");
+        pigServer.registerQuery("IP = load '" 
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using 
PigStorage(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");     
           
         
@@ -556,24 +579,20 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output+"/bar", 
-                                            pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "/bar" + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());          
         }
-
+        
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
+    
     @Test
     public void testInputOutputSpecs() throws Exception {
-        // FIXME : this should be tested in all modes
-        if(execType == ExecType.LOCAL)
-            return;
         File input = Util.createInputFile("tmp", "", 
                                           new String[] {"A,1", "B,2", "C,3", 
                                                         "D,2", "A,5", "B,5", 
@@ -601,17 +620,20 @@
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults = 
-                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                     
Util.toDataByteArrays(expectedSecondFields));
         // Pig query to run
         pigServer.registerQuery(
                 "define CMD `" + command.getName() + " foo bar foobar` " +
                 "ship ('" + Util.encodeEscape(command.toString()) + "') " +
-                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "input('foo' using " + PigStreaming.class.getName() + "(',')) 
" +
                 "output('bar', " +
-                "'foobar' using " + PigStorage.class.getName() + "(',')) " +
+                "'foobar' using " + PigStreaming.class.getName() + "(',')) " +
                 "stderr();"); 
-        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("IP = load '" 
+                + Util.generateURI(Util.encodeEscape(input.toString()),
+                        pigServer.getPigContext()) + "' using " 
+                + PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");     
           
         
@@ -619,16 +641,14 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output+"/foobar", 
-                                            pigServer.getPigContext());
-        PigStorage ps = new PigStorage(",");
-        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        pigServer.registerQuery("A = load '" + output + "/foobar" + "' using 
PigStorage(',');");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
+        
         List<Tuple> outputs = new ArrayList<Tuple>();
-        Tuple t;
-        while ((t = ps.getNext()) != null) {
-            outputs.add(t);
+        while (iter.hasNext()) {
+            outputs.add(iter.next());            
         }
-
+        
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
         
@@ -656,14 +676,17 @@
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
                 expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                         
Util.toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
             pigServer.registerQuery("define CMD `" + 
simpleEchoStreamingCommand + 
                                     " | " + simpleEchoStreamingCommand + "`;");
-            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                    PigStorage.class.getName() + "(',');");
+            pigServer.registerQuery("IP = load '" 
+                    + Util.generateURI(Util.encodeEscape(input.toString()),
+                            pigServer.getPigContext()) + "' using " 
+                    + PigStorage.class.getName() + "(',');");
             if(withTypes[i] == true) {
                 pigServer.registerQuery("OP = stream IP through CMD as 
(f0:chararray, f1:int);");
             } else {
@@ -676,16 +699,7 @@
     }
 
     @Test
-    public void testLocalNegativeLoadStoreOptimization() throws Exception {
-        testNegativeLoadStoreOptimization(ExecType.LOCAL);
-    }
-    
-    @Test
-    public void testMRNegativeLoadStoreOptimization() throws Exception {
-        testNegativeLoadStoreOptimization(ExecType.MAPREDUCE);
-    }
-    
-    private void testNegativeLoadStoreOptimization(ExecType execType) 
+    public void testNegativeLoadStoreOptimization() 
     throws Exception {
         File input = Util.createInputFile("tmp", "", 
                                           new String[] {"A,1", "B,2", "C,3", 
"D,2",
@@ -703,15 +717,17 @@
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
                 expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                                         
Util.toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
             pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand 
+ 
-                                    "` input(stdin using PigDump);");
-            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
-                                    PigStorage.class.getName() + "(',') " +
-                                    "split by 'file';");
+                                    "` input(stdin);");
+            pigServer.registerQuery("IP = load '" 
+                    + Util.generateURI(Util.encodeEscape(input.toString()),
+                            pigServer.getPigContext()) + "' using " 
+                    + PigStorage.class.getName() + "(',') " + "split by 
'file';");
             pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
             if(withTypes[i] == true) {
                 pigServer.registerQuery("OP = stream FILTERED_DATA through `" +

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java?rev=893019&r1=893018&r2=893019&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java
 Mon Dec 21 22:42:57 2009
@@ -49,10 +49,7 @@
     @Before
     @Override
     protected void setUp() throws Exception {
-
-
         pigServer = new PigServer("local");
-
     }
 
     @After
@@ -61,7 +58,8 @@
         pigServer.shutdown();
     }
 
-    private Tuple[] setupExpectedResults(Object[] firstField, Object[] 
secondField) throws ExecException {
+    private Tuple[] setupExpectedResults(Object[] firstField,
+            Object[] secondField) throws ExecException {
         Assert.assertEquals(firstField.length, secondField.length);
 
         Tuple[] expectedResults = new Tuple[firstField.length];
@@ -92,8 +90,9 @@
                 expectedResults = 
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
-                expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                expectedResults = setupExpectedResults(Util
+                        .toDataByteArrays(expectedFirstFields), Util
+                        .toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
@@ -134,8 +133,9 @@
                 expectedResults = 
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
-                expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                expectedResults = setupExpectedResults(Util
+                        .toDataByteArrays(expectedFirstFields), Util
+                        .toDataByteArrays(expectedSecondFields));
             }
             // Pig query to run
             pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
@@ -173,8 +173,9 @@
                 expectedResults = 
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
-                expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                expectedResults = setupExpectedResults(Util
+                        .toDataByteArrays(expectedFirstFields), Util
+                        .toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
@@ -308,13 +309,14 @@
                 expectedResults = 
                     setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
             } else {
-                expectedResults = 
-                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+                expectedResults = setupExpectedResults(Util
+                        .toDataByteArrays(expectedFirstFields), Util
+                        .toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
             pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand 
+ 
-            "` input(stdin using PigDump);");
+            "` input(stdin);");
             pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
                     PigStorage.class.getName() + "(',') " +
             "split by 'file';");

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=893019&r1=893018&r2=893019&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java
 Mon Dec 21 22:42:57 2009
@@ -5244,7 +5244,7 @@
 
         LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
         
-        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
+        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming"));
 
     }
 
@@ -5276,7 +5276,7 @@
 
         LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
         
-        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
+        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming"));
 
         foreachPlan = foreach.getForEachPlans().get(1);
 
@@ -5323,7 +5323,7 @@
         exOp = foreachPlan.getRoots().get(0);
         if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
         cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
+        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming"));
 
     }
 
@@ -5363,7 +5363,7 @@
         exOp = foreachPlan.getRoots().get(0);
         if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
         cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
+        
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming"));
 
     }
 

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=893019&r1=893018&r2=893019&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java 
(original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java 
Mon Dec 21 22:42:57 2009
@@ -309,20 +309,19 @@
     }
     
     /**
-        * Helper function to check if the result of a Pig Query is in line 
with 
-        * expected results.
-        * 
-        * @param actualResults Result of the executed Pig query
-        * @param expectedResults Expected results to validate against
-        */
-       static public void checkQueryOutputs(Iterator<Tuple> actualResults, 
-                                               Tuple[] expectedResults) {
-           
-               for (Tuple expected : expectedResults) {
-                       Tuple actual = actualResults.next();
-                       Assert.assertEquals(expected, actual);
-               }
-       }
+    * Helper function to check if the result of a Pig Query is in line with 
+    * expected results.
+    * 
+    * @param actualResults Result of the executed Pig query
+    * @param expectedResults Expected results to validate against
+    */
+    static public void checkQueryOutputs(Iterator<Tuple> actualResults, 
+                                    Tuple[] expectedResults) {
+        for (Tuple expected : expectedResults) {
+            Tuple actual = actualResults.next();
+            Assert.assertEquals(expected.toString(), actual.toString());
+        }
+    }
 
        /**
         * Utility method to copy a file form local filesystem to the dfs on


Reply via email to