Author: olga
Date: Fri May  2 13:06:31 2008
New Revision: 652887

URL: http://svn.apache.org/viewvc?rev=652887&view=rev
Log:
PIG-228: make multiple streaming outputs adhere to spec

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri May  2 13:06:31 2008
@@ -260,3 +260,5 @@
 
     PIG-226: fix for streaming optimization bug (acmurthy via olgan)
 
+    PIG-228: make multiple streaming outputs adhere to spec (acmurthy via 
olgan)
+

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 Fri May  2 13:06:31 2008
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.text.NumberFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -45,6 +46,16 @@
  * of the managed process and also persists the logs of the tasks on HDFS. 
  */
 public class HadoopExecutableManager extends ExecutableManager {
+    // The part-<partition> file name, similar to Hadoop's outputs
+    private static final NumberFormat NUMBER_FORMAT = 
NumberFormat.getInstance();
+    static {
+      NUMBER_FORMAT.setMinimumIntegerDigits(5);
+      NUMBER_FORMAT.setGroupingUsed(false);
+    }
+
+    static String getOutputName(int partition) {
+      return "part-" + NUMBER_FORMAT.format(partition);
+    }
 
     JobConf job;
     
@@ -122,10 +133,13 @@
                 for (int i=1; i < outputSpecs.size(); ++i) {
                     String fileName = outputSpecs.get(i).getName();
                     try {
+                        int partition = job.getInt("mapred.task.partition", 
-1);
                         fs.copyFromLocalFile(false, true, new Path(fileName), 
-                                new Path(scriptOutputDir, 
-                                        taskId+"-"+fileName)
-                        );
+                                             new Path(
+                                                     new Path(scriptOutputDir, 
+                                                              fileName), 
+                                                     getOutputName(partition))
+                                            );
                     } catch (IOException ioe) {
                         System.err.println("Failed to save secondary output '" 
+ 
                                            fileName + "' of task: " + taskId +

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Fri May  2 
13:06:31 2008
@@ -324,9 +324,9 @@
                           "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open 
\".$ARGV[1].\"!: $!\";",
                           "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open 
\".$ARGV[2].\"!: $!\";",
                           "while (<STDIN>) {",
-                          "  print OUTFILE \"A,10\n\";",
+                          "  print OUTFILE \"$_\n\";",
                           "  print STDERR \"STDERR: $_\n\";",
-                          "  print OUTFILE2 \"Secondary Output: $_\n\";",
+                          "  print OUTFILE2 \"A,10\n\";",
                           "}",
                             };
            File command = Util.createInputFile("script", "pl", script);
@@ -354,7 +354,8 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        InputStream op = FileLocalizer.open(output+"/bar", 
+                                            pigServer.getPigContext());
         PigStorage ps = new PigStorage(",");
         ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
         List<Tuple> outputs = new ArrayList<Tuple>();
@@ -388,7 +389,7 @@
                           "  chomp $_;",
                           "  print OUTFILE \"$_\n\";",
                           "  print STDERR \"STDERR: $_\n\";",
-                          "  print OUTFILE2 \"Secondary Output: $_\n\";",
+                          "  print OUTFILE2 \"$_\n\";",
                           "}",
                          };
         File command = Util.createInputFile("script", "pl", script);
@@ -417,7 +418,8 @@
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
-        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        InputStream op = FileLocalizer.open(output+"/foobar", 
+                                            pigServer.getPigContext());
         PigStorage ps = new PigStorage(",");
         ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
         List<Tuple> outputs = new ArrayList<Tuple>();


Reply via email to