Added: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java 
(added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java 
Thu Dec 11 14:29:29 2008
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStreamingLocal extends TestCase {
+
+    private TupleFactory tf = DefaultTupleFactory.getInstance();
+    PigServer pigServer;
+
+    private static final String simpleEchoStreamingCommand;
+    static {
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+    }
+
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+
+
+        pigServer = new PigServer("local");
+
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        pigServer.shutdown();
+    }
+
+    private Tuple[] setupExpectedResults(Object[] firstField, Object[] 
secondField) throws ExecException {
+        Assert.assertEquals(firstField.length, secondField.length);
+
+        Tuple[] expectedResults = new Tuple[firstField.length];
+        for (int i=0; i < expectedResults.length; ++i) {
+            expectedResults[i] = tf.newTuple(2);
+            expectedResults[i].set(0, firstField[i]);
+            expectedResults[i].set(1, secondField[i]);
+        }
+
+        return expectedResults;
+    }
+
+    @Test
+    public void testSimpleMapSideStreaming() 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", 
"A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+            } else {
+                expectedResults = 
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',');");
+            pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                    simpleEchoStreamingCommand + "`;");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, 
f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "`;");
+            }
+
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
+        }
+    }
+
+    @Test
+    public void testSimpleMapSideStreamingWithOutputSchema() 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+        // Expected results
+        Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
+        Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9};
+
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+            } else {
+                expectedResults = 
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+            }
+            // Pig query to run
+            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',');");
+            pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA 
through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, 
f1:int);");
+            } else {
+                pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA 
through `" +
+                        simpleEchoStreamingCommand + "` as (f0, f1);");
+            }
+            pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;");
+
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
+        }
+    }
+
+    @Test
+    public void testSimpleReduceSideStreamingAfterFlatten() 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", 
"D"};
+        Integer[] expectedSecondFields = new Integer[] {5, 8, 9, 5, 8, 8};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+            } else {
+                expectedResults = 
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',');");
+            pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by 
$0;");
+            pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach 
GROUPED_DATA " +
+            "generate flatten($1);");
+            pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA 
through `" +
+                    simpleEchoStreamingCommand + "`;");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, 
f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "`;");
+            }
+
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
+        }
+    }
+
+    @Test
+    public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws 
Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1,2,3", "B,2,4,5",
+                "C,3,1,2", "D,2,5,2",
+                "A,5,5,1", "B,5,7,4",
+                "C,8,9,2", "A,8,4,5",
+                "D,8,8,3", "A,9,2,5"}
+        );
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
+        Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 
8, 2, 8};
+        Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 
5, 8};
+        Integer[] expectedFourthFields = new Integer[] {3, 5, 5, 1, 5, 4, 2, 
2, 2, 3};
+        Tuple[] expectedResults = new Tuple[10];
+        for (int i = 0; i < expectedResults.length; ++i) {
+            expectedResults[i] = tf.newTuple(4);
+            expectedResults[i].set(0, expectedFirstFields[i]);
+            expectedResults[i].set(1, expectedSecondFields[i]);
+            expectedResults[i].set(2, expectedThirdFields[i]);
+            expectedResults[i].set(3, expectedFourthFields[i]);
+        }
+        //setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+        // Pig query to run
+        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("S2 = stream S1 through `" +
+                simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
+        pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
+                "  D = order IP BY $2, $3;" +
+                "  generate flatten(D);" +
+        "};");
+        pigServer.registerQuery("S3 = stream ORDERED_DATA through `" +
+                simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("OP = stream S3 through `" +
+                simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, 
f2:int, f3:int);");
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+    }
+
+    @Test
+    public void testSimpleMapSideStreamingWithUnixPipes() 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 
8, 8, 9};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+            } else {
+                expectedResults = 
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("define CMD `" + 
simpleEchoStreamingCommand + 
+                    " | " + simpleEchoStreamingCommand + "`;");
+            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',');");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream IP through CMD as 
(f0:chararray, f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream IP through CMD;");        
        
+            }
+
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
+        }
+    }
+
+    @Test
+    public void testLocalNegativeLoadStoreOptimization() throws Exception {
+        testNegativeLoadStoreOptimization(ExecType.LOCAL);
+    }
+
+    private void testNegativeLoadStoreOptimization(ExecType execType) 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", 
"A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+            } else {
+                expectedResults = 
+                    
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand 
+ 
+            "` input(stdin using PigDump);");
+            pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',') " +
+            "split by 'file';");
+            pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, 
f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+                        simpleEchoStreamingCommand + "`;");                
+            }
+
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
+        }
+    }
+}


Reply via email to