Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java?rev=893019&r1=893018&r2=893019&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreaming.java Mon Dec 21 22:42:57 2009 @@ -18,32 +18,50 @@ package org.apache.pig.test; import java.io.File; -import java.io.InputStream; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DefaultTupleFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.streaming.PigStreaming; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -public class TestStreaming extends PigExecTestCase { +public class TestStreaming { + + private static final MiniCluster cluster = MiniCluster.buildCluster(); + + private PigServer pigServer; + + @Before + public void setup() throws ExecException { + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + @After + public void tearDown() { + pigServer = null; + } private TupleFactory tf = DefaultTupleFactory.getInstance(); private static final String simpleEchoStreamingCommand; - static { - if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) - simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'"; - else - simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'"; - } + + static { + if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) + simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'"; + else + simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'"; + } private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException { Assert.assertEquals(firstField.length, secondField.length); @@ -59,8 +77,7 @@ } @Test - public void testSimpleMapSideStreaming() - throws Exception { + public void testSimpleMapSideStreaming() throws Exception { File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", "C,8", "A,8", @@ -77,12 +94,15 @@ setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); } // Pig query to run - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + simpleEchoStreamingCommand + "`;"); @@ -119,11 +139,14 @@ setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); } // Pig query to run - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); if(withTypes[i] == true) { pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" + @@ -158,12 +181,15 @@ setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); } // Pig query to run - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;"); pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " + @@ -210,8 +236,10 @@ //setupExpectedResults(expectedFirstFields, expectedSecondFields); // Pig query to run - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + simpleEchoStreamingCommand + "`;"); @@ -233,9 +261,7 @@ @Test public void testInputShipSpecs() throws Exception { - // FIXME : this should be tested in all modes - if(execType == ExecType.LOCAL) - return; + File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -261,24 +287,27 @@ new String[] {"A", "B", "C", "A", "D", "A"}; Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run pigServer.registerQuery( "define CMD1 `" + command1.getName() + " foo` " + "ship ('" + Util.encodeEscape(command1.toString()) + "') " + - "input('foo' using " + PigStorage.class.getName() + "(',')) " + - "output(stdout using " + PigStorage.class.getName() + "(',')) " + + "input('foo' using " + PigStreaming.class.getName() + "(',')) " + + "output(stdout using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); pigServer.registerQuery( "define CMD2 `" + command2.getName() + " bar` " + "ship ('" + Util.encodeEscape(command2.toString()) + "') " + - "input('bar' using " + PigStorage.class.getName() + "(',')) " + - "output(stdout using " + PigStorage.class.getName() + "(',')) " + + "input('bar' using " + PigStreaming.class.getName() + "(',')) " + + "output(stdout using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + + "' using PigStorage(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + "through CMD1;"); @@ -288,13 +317,12 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } // Run the query and check the results @@ -303,9 +331,6 @@ @Test public void testInputShipSpecsWithUDFDefine() throws Exception { - // FIXME : this should be tested in all modes - if(execType == ExecType.LOCAL) - return; File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -331,16 +356,16 @@ new String[] {"A", "B", "C", "A", "D", "A"}; Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run - - pigServer.registerQuery( - "define PS " + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("define PS " + PigStreaming.class.getName() + "(',');"); + pigServer.registerQuery( "define CMD1 `" + command1.getName() + " foo` " + "ship ('" + Util.encodeEscape(command1.toString()) + "') " + - "input('foo' using PS ) " + + "input('foo' using PS )" + "output(stdout using PS ) " + "stderr();"); pigServer.registerQuery( @@ -349,7 +374,9 @@ "input('bar' using PS ) " + "output(stdout using PS ) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS ;"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using PigStorage(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + "through CMD1;"); @@ -359,13 +386,12 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } // Run the query and check the results @@ -373,11 +399,7 @@ } @Test - public void testInputCacheSpecs() throws Exception { - // Can't run this without HDFS - if(execType == ExecType.LOCAL) - return; - + public void testInputCacheSpecs() throws Exception { File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -408,21 +430,24 @@ new String[] {"A", "B", "C", "A", "D", "A"}; Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run pigServer.registerQuery( "define CMD1 `script1.pl foo` " + "cache ('" + c1 + "#script1.pl') " + - "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "input('foo' using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); pigServer.registerQuery( "define CMD2 `script2.pl bar` " + "cache ('" + c2 + "#script2.pl') " + - "input('bar' using " + PigStorage.class.getName() + "(',')) " + + "input('bar' using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + "through CMD1;"); @@ -432,24 +457,20 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } - + // Run the query and check the results Util.checkQueryOutputs(outputs.iterator(), expectedResults); } @Test public void testOutputShipSpecs() throws Exception { - // FIXME : this should be tested in all modes - if(execType == ExecType.LOCAL) - return; File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -475,17 +496,20 @@ new String[] {"A", "A", "A", "A", "A", "A"}; Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run pigServer.registerQuery( "define CMD `" + command.getName() + " foo bar` " + "ship ('" + Util.encodeEscape(command.toString()) + "') " + - "output('foo' using " + PigStorage.class.getName() + "(','), " + - "'bar' using " + PigStorage.class.getName() + "(',')) " + + "output('foo' using " + PigStreaming.class.getName() + "(','), " + + "'bar' using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); @@ -493,25 +517,20 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output+"/bar", - pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } - + // Run the query and check the results Util.checkQueryOutputs(outputs.iterator(), expectedResults); } @Test public void testOutputShipSpecsWithUDFDefine() throws Exception { - // FIXME : this should be tested in all modes - if(execType == ExecType.LOCAL) - return; File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -537,18 +556,22 @@ new String[] {"A", "A", "A", "A", "A", "A"}; Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run pigServer.registerQuery( - "define PS " + PigStorage.class.getName() + "(',');"); + "define PS " + PigStreaming.class.getName() + "(',');"); + pigServer.registerQuery( "define CMD `" + command.getName() + " foo bar` " + "ship ('" + Util.encodeEscape(command.toString()) + "') " + "output('foo' using PS, " + "'bar' using PS) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using PS;"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using PigStorage(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); @@ -556,24 +579,20 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output+"/bar", - pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } - + // Run the query and check the results Util.checkQueryOutputs(outputs.iterator(), expectedResults); } + @Test public void testInputOutputSpecs() throws Exception { - // FIXME : this should be tested in all modes - if(execType == ExecType.LOCAL) - return; File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", "A,5", "B,5", @@ -601,17 +620,20 @@ new String[] {"A", "B", "C", "A", "D", "A"}; Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; Tuple[] expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); // Pig query to run pigServer.registerQuery( "define CMD `" + command.getName() + " foo bar foobar` " + "ship ('" + Util.encodeEscape(command.toString()) + "') " + - "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "input('foo' using " + PigStreaming.class.getName() + "(',')) " + "output('bar', " + - "'foobar' using " + PigStorage.class.getName() + "(',')) " + + "'foobar' using " + PigStreaming.class.getName() + "(',')) " + "stderr();"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); @@ -619,16 +641,14 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output+"/foobar", - pigServer.getPigContext()); - PigStorage ps = new PigStorage(","); - ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + pigServer.registerQuery("A = load '" + output + "/foobar" + "' using PigStorage(',');"); + Iterator<Tuple> iter = pigServer.openIterator("A"); + List<Tuple> outputs = new ArrayList<Tuple>(); - Tuple t; - while ((t = ps.getNext()) != null) { - outputs.add(t); + while (iter.hasNext()) { + outputs.add(iter.next()); } - + // Run the query and check the results Util.checkQueryOutputs(outputs.iterator(), expectedResults); @@ -656,14 +676,17 @@ setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); } // Pig query to run pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + " | " + simpleEchoStreamingCommand + "`;"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',');"); if(withTypes[i] == true) { pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);"); } else { @@ -676,16 +699,7 @@ } @Test - public void testLocalNegativeLoadStoreOptimization() throws Exception { - testNegativeLoadStoreOptimization(ExecType.LOCAL); - } - - @Test - public void testMRNegativeLoadStoreOptimization() throws Exception { - testNegativeLoadStoreOptimization(ExecType.MAPREDUCE); - } - - private void testNegativeLoadStoreOptimization(ExecType execType) + public void testNegativeLoadStoreOptimization() throws Exception { File input = Util.createInputFile("tmp", "", new String[] {"A,1", "B,2", "C,3", "D,2", @@ -703,15 +717,17 @@ setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), + Util.toDataByteArrays(expectedSecondFields)); } // Pig query to run pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + - "` input(stdin using PigDump);"); - pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + - PigStorage.class.getName() + "(',') " + - "split by 'file';"); + "` input(stdin);"); + pigServer.registerQuery("IP = load '" + + Util.generateURI(Util.encodeEscape(input.toString()), + pigServer.getPigContext()) + "' using " + + PigStorage.class.getName() + "(',') " + "split by 'file';"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); if(withTypes[i] == true) { pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java?rev=893019&r1=893018&r2=893019&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java Mon Dec 21 22:42:57 2009 @@ -49,10 +49,7 @@ @Before @Override protected void setUp() throws Exception { - - pigServer = new PigServer("local"); - } @After @@ -61,7 +58,8 @@ pigServer.shutdown(); } - private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException { + private Tuple[] setupExpectedResults(Object[] firstField, + Object[] secondField) throws ExecException { Assert.assertEquals(firstField.length, secondField.length); Tuple[] expectedResults = new Tuple[firstField.length]; @@ -92,8 +90,9 @@ expectedResults = setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { - expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + expectedResults = setupExpectedResults(Util + .toDataByteArrays(expectedFirstFields), Util + .toDataByteArrays(expectedSecondFields)); } // Pig query to run @@ -134,8 +133,9 @@ expectedResults = setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { - expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + expectedResults = setupExpectedResults(Util + .toDataByteArrays(expectedFirstFields), Util + .toDataByteArrays(expectedSecondFields)); } // Pig query to run pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + @@ -173,8 +173,9 @@ expectedResults = setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { - expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + expectedResults = setupExpectedResults(Util + .toDataByteArrays(expectedFirstFields), Util + .toDataByteArrays(expectedSecondFields)); } // Pig query to run @@ -308,13 +309,14 @@ expectedResults = setupExpectedResults(expectedFirstFields, expectedSecondFields); } else { - expectedResults = - setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + expectedResults = setupExpectedResults(Util + .toDataByteArrays(expectedFirstFields), Util + .toDataByteArrays(expectedSecondFields)); } // Pig query to run pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + - "` input(stdin using PigDump);"); + "` input(stdin);"); pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + PigStorage.class.getName() + "(',') " + "split by 'file';"); Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=893019&r1=893018&r2=893019&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java Mon Dec 21 22:42:57 2009 @@ -5244,7 +5244,7 @@ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); + assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming")); } @@ -5276,7 +5276,7 @@ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); + assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming")); foreachPlan = foreach.getForEachPlans().get(1); @@ -5323,7 +5323,7 @@ exOp = foreachPlan.getRoots().get(0); if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1); cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); + assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming")); } @@ -5363,7 +5363,7 @@ exOp = foreachPlan.getRoots().get(0); if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1); cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); + assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.impl.streaming.PigStreaming")); } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=893019&r1=893018&r2=893019&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Mon Dec 21 22:42:57 2009 @@ -309,20 +309,19 @@ } /** - * Helper function to check if the result of a Pig Query is in line with - * expected results. - * - * @param actualResults Result of the executed Pig query - * @param expectedResults Expected results to validate against - */ - static public void checkQueryOutputs(Iterator<Tuple> actualResults, - Tuple[] expectedResults) { - - for (Tuple expected : expectedResults) { - Tuple actual = actualResults.next(); - Assert.assertEquals(expected, actual); - } - } + * Helper function to check if the result of a Pig Query is in line with + * expected results. + * + * @param actualResults Result of the executed Pig query + * @param expectedResults Expected results to validate against + */ + static public void checkQueryOutputs(Iterator<Tuple> actualResults, + Tuple[] expectedResults) { + for (Tuple expected : expectedResults) { + Tuple actual = actualResults.next(); + Assert.assertEquals(expected.toString(), actual.toString()); + } + } /** * Utility method to copy a file form local filesystem to the dfs on