Author: gates Date: Tue Apr 22 11:41:49 2008 New Revision: 650606 URL: http://svn.apache.org/viewvc?rev=650606&view=rev Log: PIG-216 Fix streaming to work with commands that use unix pipes.
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=650606&r1=650605&r2=650606&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Tue Apr 22 11:41:49 2008 @@ -243,3 +243,6 @@ PIG-213: Remove non-static references to logger from data bags and tuples, as it causes significant overhead (vgeschel via gates). + + PIG-216 Fix streaming to work with commands that use unix pipes (acmurthy + via gates). Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=650606&r1=650605&r2=650606&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Apr 22 11:41:49 2008 @@ -236,18 +236,8 @@ cmdArgs.add("-c"); StringBuffer sb = new StringBuffer(); sb.append("exec "); - for (int i=0; i < argv.length; ++i) { - // Single-quote each component, however ensure that already - // quoted args are handled right - sb.append('\''); - - String arg = argv[i]; - if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == '\'') { - arg = arg.substring(1, arg.length()-1); - } - sb.append(arg); - - sb.append('\''); + for (String arg : argv) { + sb.append(arg); sb.append(" "); } cmdArgs.add(sb.toString()); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=650606&r1=650605&r2=650606&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Tue Apr 22 11:41:49 2008 @@ -432,4 +432,42 @@ // Cleanup pigServer.deleteFile(output); } + + @Test + public void testLocalSimpleMapSideStreamingWithUnixPipes() + throws Exception { + testSimpleMapSideStreamingWithUnixPipes(ExecType.LOCAL); + } + + @Test + public void testMRSimpleMapSideStreamingWithUnixPipes() throws Exception { + testSimpleMapSideStreamingWithUnixPipes(ExecType.MAPREDUCE); + } + + private void testSimpleMapSideStreamingWithUnixPipes(ExecType execType) + throws Exception { + PigServer pigServer = createPigServer(execType); + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"}; + int[] expectedSecondFields = new int[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + + // Pig query to run + pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + + " | " + simpleEchoStreamingCommand + "`;"); + pigServer.registerQuery("IP = load 'file:" + input + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("OP = stream IP through CMD;"); + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + }