Author: olga
Date: Thu Mar  6 11:11:26 2008
New Revision: 634380

URL: http://svn.apache.org/viewvc?rev=634380&view=rev
Log:
PIG-94: M1 for streaming: maps and reduce side support with default
    (de)serializer (acmurthy via olgan)

Added:
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/
    
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/test/org/apache/pig/test/Util.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar  6 11:11:26 2008
@@ -157,3 +157,6 @@
        PIG-120:  Support map reduce in local mode.  To do this user needs to
        specify execution type as mapreduce and cluster name as local (joa23 via
        gates).
+
+    PIG-94: M1 for streaming: maps and reduce side support with default
+    (de)serializer (acmurthy via olgan)

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java Thu 
Mar  6 11:11:26 2008
@@ -143,7 +143,12 @@
     public void visitTupleWindow(TupleWindowSpec tw) {
     }
 
-
+    /**
+     * Only StreamSpec.visit() and subclass implementations of this function 
+     * should ever call this method. 
+     */
+    public void visitStream(StreamSpec stream) {
+    }
 
 }
 

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Thu Mar  6 
11:11:26 2008
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.streaming.PigExecutableManager;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class StreamSpec extends EvalSpec {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = 
+        LogFactory.getLog(StreamSpec.class.getName());
+
+    private String streamingCommand;                       // Actual command 
to be run
+    private String deserializer;                           // LoadFunc to be 
used
+    private String serializer;                             // StoreFunc to be 
used
+
+    public StreamSpec(String streamingCommand) {
+        this.streamingCommand = streamingCommand;
+        this.deserializer = PigStorage.class.getName();
+        this.serializer = PigStorage.class.getName();
+    }
+
+    public StreamSpec(String streamingCommand, 
+                      String deserializer, String serializer) {
+        this.streamingCommand = streamingCommand;
+        this.deserializer = deserializer;
+        this.serializer = serializer;
+    }
+    
+    @Override
+    public List<String> getFuncs() {
+        // No user-defined functions here
+        return new ArrayList<String>();
+    }
+
+    protected Schema mapInputSchema(Schema schema) {
+        // EvalSpec _has_ to have the schema if there is one...
+        return null;
+    }
+
+    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+        return new StreamDataCollector(streamingCommand,
+                                       (deserializer == null) ? new 
PigStorage() :
+                                         
(LoadFunc)PigContext.instantiateFuncFromSpec(
+                                                                        
deserializer),            
+                                       (serializer == null) ? new PigStorage() 
:
+                                         
(StoreFunc)PigContext.instantiateFuncFromSpec(
+                                                                        
serializer),
+                                      endOfPipe);
+    }
+
+    public void visit(EvalSpecVisitor v) {
+        v.visitStream(this);
+    }
+
+    /**
+     * A simple [EMAIL PROTECTED] DataCollector} which wraps a [EMAIL 
PROTECTED] PigExecutableManager}
+     * and lets it handle the input and the output to the managed executable.
+     */
+    private static class StreamDataCollector extends DataCollector {
+        PigExecutableManager executable;                          //Executable 
manager
+        
+        public StreamDataCollector(String streamingCommand,
+                                   LoadFunc deserializer, StoreFunc serializer,
+                                   DataCollector endOfPipe) {
+            super(endOfPipe);
+
+            DataCollector successor = 
+                new DataCollector(endOfPipe) {
+                public void add(Datum d) {
+                    // Just forward the data to the next EvalSpec in the 
pipeline
+                    addToSuccessor(d);
+                }
+            };
+
+            try {
+                // Create the PigExecutableManager
+                executable = new PigExecutableManager(streamingCommand, 
+                                                      deserializer, 
serializer, 
+                                                      successor);
+                
+                executable.configure();
+                
+                // Start the executable
+                executable.run();
+            } catch (Exception e) {
+                LOG.fatal("Failed to create/start PigExecutableManager with: " 
+ e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void add(Datum d) {
+            try {
+                executable.add(d);
+            } catch (IOException ioe) {
+                LOG.fatal("executable.add(" + d + ") failed with: " + ioe);
+                throw new RuntimeException(ioe);
+            }
+        }
+
+        protected void finish() {
+            try {
+                executable.close();
+            }
+            catch (Exception e) {
+                LOG.fatal("Failed to close PigExecutableManager with: " + e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Thu Mar  6 11:11:26 2008
@@ -298,6 +298,9 @@
 TOKEN : { <GENERATE : "generate"> }
 TOKEN : { <FLATTEN : "flatten"> }
 TOKEN : { <EVAL : "eval"> }
+TOKEN : { <STREAM : "stream"> }
+TOKEN : { <THROUGH : "through"> }
+TOKEN : { <BACKTICK : "`"> }
 
 TOKEN:
 {
@@ -316,6 +319,7 @@
 }
 
 TOKEN : { <QUOTEDSTRING : "'" (~["'"])* "'"> }
+TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
 // Pig has special variables starting with $
 TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
 
@@ -411,6 +415,7 @@
 |   (<JOIN> op = JoinClause())
 |      (<UNION> op = UnionClause())
 |      (<FOREACH> op = ForEachClause())
+|   (<STREAM> op = StreamClause() [<AS> schema = SchemaTuple() 
{op.setSchema(schema);} ])
        )
     [<PARALLEL> t2=<NUMBER> { 
op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
        )       
@@ -1126,3 +1131,22 @@
                return item;
        }
 }
+
+LogicalOperator StreamClause(): {LogicalOperator input; String 
streamingCommand;}
+{
+       input = NestedExpr()    
+       
+       <THROUGH> streamingCommand = StreamingCommand()
+       {
+               return new LOEval(opTable, scope, getNextId(), 
input.getOperatorKey(), new StreamSpec(streamingCommand));
+       }
+}
+
+String StreamingCommand(): {Token t;}
+{
+       t = <EXECCOMMAND>
+       {
+               return unquote(t.image);
+       }
+}
+

Added: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java?rev=634380&view=auto
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java 
(added)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java 
Thu Mar  6 11:11:26 2008
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
+/**
+ * [EMAIL PROTECTED] PigExecutableManager} manages an external executable 
which processes data
+ * in a Pig query.
+ * 
+ * The <code>PigExecutableManager</code> is responsible for startup/teardown 
of the 
+ * external process and also for managing it.
+ * It feeds input records to the executable via it's <code>stdin</code>, 
+ * collects the output records from the <code>stdout</code> and also 
diagnostic 
+ * information from the <code>stdout</code>.
+ */
+public class PigExecutableManager {
+       private static final Log LOG = 
+               LogFactory.getLog(PigExecutableManager.class.getName());
+
+       String command;                            // Streaming command to be 
run
+       String[] argv;                             // Parsed/split commands 
+
+       Process process;                           // Handle to the process
+       private static int SUCCESS = 0;
+       
+       protected DataOutputStream stdin;          // stdin of the process
+       StoreFunc serializer;                      // serializer to be used to
+                                                  // send data to the process
+       
+       ProcessOutputThread stdoutThread;          // thread to get process 
output
+       InputStream stdout;                        // stdout of the process
+       LoadFunc deserializer;                     // deserializer to be used to
+                                                  // interpret the process' 
output
+       
+       ProcessErrorThread stderrThread;           // thread to get process 
output
+       InputStream stderr;                        // stderr of the process
+       
+       DataCollector endOfPipe;
+
+       public PigExecutableManager(String command, 
+                                           LoadFunc deserializer, StoreFunc 
serializer, 
+                                           DataCollector endOfPipe) throws 
Exception {
+               this.command = command;
+               
+               this.argv = splitArgs(this.command);
+               if (LOG.isDebugEnabled()) {
+                   for (String cmd : argv) {
+                       LOG.debug("argv: " + cmd);
+                   }
+               }
+               
+               this.deserializer = deserializer;
+               this.serializer = serializer;
+               this.endOfPipe = endOfPipe;
+       }
+
+       private static final char SINGLE_QUOTE = '\'';
+       private static final char DOUBLE_QUOTE = '"';
+       private static String[] splitArgs(String command) throws Exception {
+               List<String> argv = new ArrayList<String>();
+
+               int beginIndex = 0;
+               
+               while (beginIndex < command.length()) {
+                       // Skip spaces
+                   while (Character.isWhitespace(command.charAt(beginIndex))) {
+                       ++beginIndex;
+                   }
+                       
+                       char delim = ' ';
+                       char charAtIndex = command.charAt(beginIndex);
+                       if (charAtIndex == SINGLE_QUOTE || charAtIndex == 
DOUBLE_QUOTE) {
+                               delim = charAtIndex;
+                       }
+                       
+                       int endIndex = command.indexOf(delim, beginIndex+1);
+                       if (endIndex == -1) {
+                               if (Character.isWhitespace(delim)) {
+                                       // Reached end of command-line
+                                       argv.add(command.substring(beginIndex));
+                                       break;
+                               } else {
+                                       // Didn't find the ending 
quote/double-quote
+                                       throw new ParseException("Illegal 
command: " + command);
+                               }
+                       }
+                       
+                       if (Character.isWhitespace(delim)) {
+                               // Do not consume the space
+                               argv.add(command.substring(beginIndex, 
endIndex));
+                       } else {
+                               // Do not consume the quotes
+                               argv.add(command.substring(beginIndex+1, 
endIndex));
+                       }
+                       
+                       beginIndex = endIndex + 1;
+               }
+               
+               return argv.toArray(new String[0]);
+       }
+
+       public void configure() {
+       }
+
+       public void close() throws Exception {
+           // Close the stdin to let the process terminate
+               stdin.flush();
+               stdin.close();
+               stdin = null;
+               
+               // Wait for the process to exit and the stdout/stderr threads 
to complete
+               int exitCode = -1;
+               try {
+                       exitCode = process.waitFor();
+                       
+                       if (stdoutThread != null) {
+                               stdoutThread.join(0);
+                       }
+                       if (stderrThread != null) {
+                               stderrThread.join(0);
+                       }
+
+               } catch (InterruptedException ie) {}
+
+               // Clean up the process
+               process.destroy();
+               
+        LOG.debug("Process exited with: " + exitCode);
+        if (exitCode != SUCCESS) {
+            throw new ExecException(command + " failed with exit status: " + 
+                                       exitCode);
+        }
+       }
+
+       public void run() throws IOException {
+               // Run the executable
+               ProcessBuilder processBuilder = new ProcessBuilder(argv);
+               process = processBuilder.start();
+               LOG.debug("Started the process for command: " + command);
+
+               // Pick up the process' stdin/stdout/stderr streams
+               stdin = 
+                       new DataOutputStream(new 
BufferedOutputStream(process.getOutputStream()));
+               stdout = 
+                       new DataInputStream(new 
BufferedInputStream(process.getInputStream()));
+               stderr = 
+                       new DataInputStream(new 
BufferedInputStream(process.getErrorStream()));
+
+               // Attach the serializer to the stdin of the process for 
sending tuples 
+               serializer.bindTo(stdin);
+               
+               // Attach the deserializer to the stdout of the process to get 
tuples
+               deserializer.bindTo("", new 
BufferedPositionedInputStream(stdout), 0, 
+                                           Long.MAX_VALUE);
+               
+               // Start the threads to process the executable's stdout and 
stderr
+               stdoutThread = new ProcessOutputThread(deserializer);
+               stdoutThread.start();
+               stderrThread = new ProcessErrorThread();
+               stderrThread.start();
+       }
+
+       public void add(Datum d) throws IOException {
+               // Pass the serialized tuple to the executable
+               serializer.putNext((Tuple)d);
+               stdin.flush();
+       }
+
+       /**
+        * Workhorse to process the stdout of the managed process.
+        * 
+        * The <code>PigExecutableManager</code>, by default, just pushes the 
received
+        * <code>Datum</code> into eval-pipeline to be processed by the 
successor.
+        * 
+        * @param d <code>Datum</code> to process
+        */
+       protected void processOutput(Datum d) {
+               endOfPipe.add(d);
+       }
+       
+       class ProcessOutputThread extends Thread {
+
+               LoadFunc deserializer;
+
+               ProcessOutputThread(LoadFunc deserializer) {
+                       setDaemon(true);
+                       this.deserializer = deserializer;
+               }
+
+               public void run() {
+                       try {
+                               // Read tuples from the executable and push 
them down the pipe
+                               Tuple tuple = null;
+                               while ((tuple = deserializer.getNext()) != 
null) {
+                                       processOutput(tuple);
+                               }
+
+                               if (stdout != null) {
+                                       stdout.close();
+                                       LOG.debug("ProcessOutputThread done");
+                               }
+                       } catch (Throwable th) {
+                               LOG.warn(th);
+                               try {
+                                       if (stdout != null) {
+                                               stdout.close();
+                                       }
+                               } catch (IOException ioe) {
+                                       LOG.info(ioe);
+                               }
+                               throw new RuntimeException(th);
+                       }
+               }
+       }
+
+       /**
+        * Workhorse to process the stderr stream of the managed process.
+        * 
+        * By default <code>PigExecuatbleManager</code> just sends out the 
received
+        * error message to the <code>stderr</code> of itself.
+        * 
+        * @param error error message from the managed process.
+        */
+       protected void processError(String error) {
+               // Just send it out to our stderr
+               System.err.println(error);
+       }
+       
+       class ProcessErrorThread extends Thread {
+
+               public ProcessErrorThread() {
+                       setDaemon(true);
+               }
+
+               public void run() {
+                       try {
+                               String error;
+                               BufferedReader reader = 
+                                       new BufferedReader(new 
InputStreamReader(stderr));
+                               while ((error = reader.readLine()) != null) {
+                                       processError(error);
+                               }
+
+                               if (stderr != null) {
+                                       stderr.close();
+                                       LOG.debug("ProcessErrorThread done");
+                               }
+                       } catch (Throwable th) {
+                               LOG.warn(th);
+                               try {
+                                       if (stderr != null) {
+                                               stderr.close();
+                                       }
+                               } catch (IOException ioe) {
+                                       LOG.info(ioe);
+                       throw new RuntimeException(th);
+                               }
+                       }
+               }
+       }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu Mar  6 
11:11:26 2008
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.*;
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+
+import junit.framework.TestCase;
+
+public class TestStreaming extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+       private static final String simpleEchoStreamingCommand = 
+               "perl -ne 'chomp $_; print \"$_\n\"'";
+
+       private Tuple[] setupExpectedResults(String[] firstField, int[] 
secondField) {
+               Assert.assertEquals(firstField.length, secondField.length);
+               
+               Tuple[] expectedResults = new Tuple[firstField.length];
+               for (int i=0; i < expectedResults.length; ++i) {
+                       expectedResults[i] = new Tuple(2);
+                       expectedResults[i].setField(0, firstField[i]);
+                       expectedResults[i].setField(1, secondField[i]);
+               }
+               
+               return expectedResults;
+       }
+       
+       @Test
+       public void testSimpleMapSideStreaming() throws Exception {
+               PigServer pigServer = new PigServer(MAPREDUCE);
+
+               File input = Util.createInputFile("tmp", "", 
+                                                         new String[] {"A,1", 
"B,2", "C,3", "D,2",
+                                                                       "A,5", 
"B,5", "C,8", "A,8",
+                                                                       "D,8", 
"A,9"});
+
+               // Expected results
+               String[] expectedFirstFields = new String[] {"A", "B", "C", 
"A", "D", "A"};
+               int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+               Tuple[] expectedResults = 
+                       setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+
+               // Pig query to run
+               pigServer.registerQuery("INPUT = load 'file:" + input + "' 
using " + 
+                                               PigStorage.class.getName() + 
"(',');");
+               pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > 
'3';");
+               pigServer.registerQuery("OUTPUT = stream FILTERED_DATA through 
`" +
+                                               simpleEchoStreamingCommand + 
"`;");
+               
+               // Run the query and check the results
+               Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), 
expectedResults);
+       }
+
+       @Test
+       public void testSimpleMapSideStreamingWithOutputSchema() throws 
Exception {
+               PigServer pigServer = new PigServer(MAPREDUCE);
+
+               File input = Util.createInputFile("tmp", "", 
+                                                         new String[] {"A,1", 
"B,2", "C,3", "D,2",
+                                                                       "A,5", 
"B,5", "C,8", "A,8",
+                                                                       "D,8", 
"A,9"});
+
+               // Expected results
+               String[] expectedFirstFields = new String[] {"C", "A", "D", 
"A"};
+               int[] expectedSecondFields = new int[] {8, 8, 8, 9};
+               Tuple[] expectedResults = 
+                       setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+
+               // Pig query to run
+               pigServer.registerQuery("INPUT = load 'file:" + input + "' 
using " + 
+                                               PigStorage.class.getName() + 
"(',');");
+               pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > 
'3';");
+               pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA 
through `" +
+                                               simpleEchoStreamingCommand + "` 
as (f0, f1);");
+               pigServer.registerQuery("OUTPUT = filter STREAMED_DATA by f1 > 
'6';");
+               
+               // Run the query and check the results
+               Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), 
expectedResults);
+       }
+
+       @Test
+       public void testSimpleReduceSideStreamingAfterFlatten() throws 
Exception {
+               PigServer pigServer = new PigServer(MAPREDUCE);
+
+               File input = Util.createInputFile("tmp", "", 
+                                                         new String[] {"A,1", 
"B,2", "C,3", "D,2",
+                                                                       "A,5", 
"B,5", "C,8", "A,8",
+                                                                       "D,8", 
"A,9"});
+
+               // Expected results
+               String[] expectedFirstFields = new String[] {"A", "A", "A", 
"B", "C", "D"};
+               int[] expectedSecondFields = new int[] {5, 8, 9, 5, 8, 8};
+               Tuple[] expectedResults = 
+                       setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+
+               // Pig query to run
+               pigServer.registerQuery("INPUT = load 'file:" + input + "' 
using " + 
+                                               PigStorage.class.getName() + 
"(',');");
+               pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > 
'3';");
+               pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by 
$0;");
+               pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach 
GROUPED_DATA " +
+                                               "generate flatten($1);");
+               pigServer.registerQuery("OUTPUT = stream FLATTENED_GROUPED_DATA 
through `" +
+                                               simpleEchoStreamingCommand + 
"`;");
+               
+               // Run the query and check the results
+               Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), 
expectedResults);
+       }
+
+       @Test
+       public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws 
Exception {
+               PigServer pigServer = new PigServer(MAPREDUCE);
+
+               File input = Util.createInputFile("tmp", "", 
+                                                         new String[] 
{"A,1,2,3", "B,2,4,5",
+                                                                       
"C,3,1,2", "D,2,5,2",
+                                                                       
"A,5,5,1", "B,5,7,4",
+                                                                       
"C,8,9,2", "A,8,4,5",
+                                                                       
"D,8,8,3", "A,9,2,5"}
+                                                );
+
+               // Expected results
+               String[] expectedFirstFields = 
+                       new String[] {"A", "A", "A", "A", "B", "B", "C", "C", 
"D", "D"};
+               int[] expectedSecondFields = new int[] {1, 9, 8, 5, 2, 5, 3, 8, 
2, 8};
+               int[] expectedThirdFields = new int[] {2, 2, 4, 5, 4, 7, 1, 9, 
5, 8};
+               int[] expectedFourthFields = new int[] {3, 5, 5, 1, 5, 4, 2, 2, 
2, 3};
+               Tuple[] expectedResults = new Tuple[10];
+               for (int i = 0; i < expectedResults.length; ++i) {
+                       expectedResults[i] = new Tuple(4);
+                       expectedResults[i].setField(0, expectedFirstFields[i]);
+                       expectedResults[i].setField(1, expectedSecondFields[i]);
+                       expectedResults[i].setField(2, expectedThirdFields[i]);
+                       expectedResults[i].setField(3, expectedFourthFields[i]);
+               }
+                       setupExpectedResults(expectedFirstFields, 
expectedSecondFields);
+
+               // Pig query to run
+               pigServer.registerQuery("INPUT = load 'file:" + input + "' 
using " + 
+                                               PigStorage.class.getName() + 
"(',');");
+               pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > 
'3';");
+               pigServer.registerQuery("GROUPED_DATA = group INPUT by $0;");
+               pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { 
" +
+                                               "  D = order INPUT BY $2, $3;" +
+                                "  generate flatten(D);" +
+                                "};");
+               pigServer.registerQuery("OUTPUT = stream ORDERED_DATA through 
`" +
+                                               simpleEchoStreamingCommand + 
"`;");
+               
+               // Run the query and check the results
+               Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), 
expectedResults);
+       }
+       
+}

Modified: incubator/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/Util.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/Util.java Thu Mar  6 11:11:26 
2008
@@ -17,9 +17,13 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
 
 import org.apache.pig.data.*;
+import org.junit.Assert;
 
 public class Util {
     // Helper Functions
@@ -68,4 +72,43 @@
         }
         return t;
     }
+
+    /**
+     * Helper to create a temporary file with given input data for use in test 
cases.
+     *  
+     * @param tmpFilenamePrefix file-name prefix
+     * @param tmpFilenameSuffix file-name suffix
+     * @param inputData input for test cases, each string in inputData[] is 
written
+     *                  on one line
+     * @return [EMAIL PROTECTED] File} handle to the created temporary file
+     * @throws IOException
+     */
+       static public File createInputFile(String tmpFilenamePrefix, 
+                                                  String tmpFilenameSuffix, 
+                                                  String[] inputData) 
+       throws IOException {
+               File f = File.createTempFile(tmpFilenamePrefix, 
tmpFilenameSuffix);
+               PrintWriter pw = new PrintWriter(f);
+               for (int i=0; i<inputData.length; i++){
+                       pw.println(inputData[i]);
+               }
+               pw.close();
+               return f;
+       }
+
+       /**
+        * Helper function to check if the result of a Pig Query is in line 
with 
+        * expected results.
+        * 
+        * @param actualResults Result of the executed Pig query
+        * @param expectedResults Expected results to validate against
+        */
+       static public void checkQueryOutputs(Iterator<Tuple> actualResults, 
+                                               Tuple[] expectedResults) {
+               for (Tuple expected : expectedResults) {
+                       Tuple actual = actualResults.next();
+                       Assert.assertEquals(expected, actual);
+               }
+       }
+
 }


Reply via email to