Author: olga
Date: Thu Apr  3 11:40:33 2008
New Revision: 644437

URL: http://svn.apache.org/viewvc?rev=644437&view=rev
Log:
 PIG-174,180: bug fixes in streaming

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=644437&r1=644436&r2=644437&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Apr  3 11:40:33 2008
@@ -196,6 +196,7 @@
        PIG-122: Added build and src-gen to the list of ignore files in
        the top level directory (joa23 via gates).
 
-    PIG-94: M3 code update for streaming
+    PIG-94: M3 code update for streaming (arunc via olgan)
     
-    PIG-55: added custom splitter
+    PIG-55: added custom splitter (groves via olgan)
+    PIG-74,180: bug fixes in streaming (arunc via olgan)

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=644437&r1=644436&r2=644437&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
 Thu Apr  3 11:40:33 2008
@@ -104,9 +104,9 @@
 
         oc = output;
 
-        setupMapPipe(properties, reporter);
-
         try {
+            setupMapPipe(properties, reporter);
+
             // allocate key & value instances that are re-used for all entries
             WritableComparable key = input.createKey();
             Writable value = input.createValue();
@@ -115,12 +115,15 @@
             }
         } finally {
             try {
-                evalPipe.finishPipe();  // EOF marker
-                evalPipe = null;
+                if (evalPipe != null) {
+                    evalPipe.finishPipe();  // EOF marker
+                    evalPipe = null;
+                }
             } finally {
                 // Close the writer
                 if (pigWriter != null) {
                     pigWriter.close(reporter);
+                    pigWriter = null;
                 }
             }
         }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=644437&r1=644436&r2=644437&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 Thu Apr  3 11:40:33 2008
@@ -69,12 +69,7 @@
                 // and input files are to be processed as-is
                 StreamSpec streamSpec = (StreamSpec)spec;
                 StreamingCommand command = streamSpec.getCommand();
-                List<HandleSpec> inputSpecs = 
-                    command.getHandleSpecs(Handle.INPUT); 
-                HandleSpec streamInputSpec = 
-                    (inputSpecs == null) ? 
-                            new HandleSpec("stdin" , "PigStorage()") :
-                            inputSpecs.get(0);
+                HandleSpec streamInputSpec = command.getInputSpec(); 
                 
                 FileSpec loadFileSpec = load.getInputFileSpec();
                 
@@ -91,7 +86,9 @@
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     load.setInputFileSpec(new 
FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
+                    
                     streamInputSpec.setSpec(BinaryStorage.class.getName());
+                    command.setInputSpec(streamInputSpec);
                     
                     optimize = true;
                 }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=644437&r1=644436&r2=644437&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 Thu Apr  3 11:40:33 2008
@@ -94,12 +94,7 @@
                 // Try and optimize if the store and stream output specs match
                 StreamSpec streamSpec = (StreamSpec)spec;
                 StreamingCommand command = streamSpec.getCommand();
-                List<HandleSpec> outputSpecs = 
-                    command.getHandleSpecs(Handle.OUTPUT); 
-                HandleSpec streamOutputSpec = 
-                    (outputSpecs == null) ? 
-                            new HandleSpec("stdout" , "PigStorage()") :
-                            outputSpecs.get(0);
+                HandleSpec streamOutputSpec = command.getOutputSpec(); 
                 
                 FileSpec storeFileSpec = s.getOutputFileSpec();
                 
@@ -116,7 +111,9 @@
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     s.setOutputFileSpec(new 
FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
+                    
                     streamOutputSpec.setSpec(BinaryStorage.class.getName());
+                    command.setOutputSpec(streamOutputSpec);
                     
                     optimize = true;
                 }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=644437&r1=644436&r2=644437&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
Thu Apr  3 11:40:33 2008
@@ -163,6 +163,64 @@
     }
     
     /**
+     * Set the input specification for the <code>StreamingCommand</code>.
+     * 
+     * @param spec input specification
+     */
+    public void setInputSpec(HandleSpec spec) {
+        List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+        if (inputSpecs == null || inputSpecs.size() == 0) {
+            addHandleSpec(Handle.INPUT, spec);
+        } else {
+            inputSpecs.set(0, spec);
+        }
+    }
+    
+    /**
+     * Get the input specification of the <code>StreamingCommand</code>.
+     * 
+     * @return input specification of the <code>StreamingCommand</code>
+     */
+    public HandleSpec getInputSpec() {
+        List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+        if (inputSpecs == null || inputSpecs.size() == 0) {
+            addHandleSpec(Handle.INPUT, new HandleSpec("stdin", 
PigStorage.class.getName()));
+        }
+        return getHandleSpecs(Handle.INPUT).get(0);        
+    }
+    
+    /**
+     * Set the specification for the primary output of the 
+     * <code>StreamingCommand</code>.
+     * 
+     * @param spec specification for the primary output of the 
+     *             <code>StreamingCommand</code>
+     */
+    public void setOutputSpec(HandleSpec spec) {
+        List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+        if (outputSpecs == null || outputSpecs.size() == 0) {
+            addHandleSpec(Handle.OUTPUT, spec);
+        } else {
+            outputSpecs.set(0, spec);
+        }
+    }
+    
+    /**
+     * Get the specification of the primary output of the 
+     * <code>StreamingCommand</code>.
+     * 
+     * @return specification of the primary output of the 
+     *         <code>StreamingCommand</code>
+     */
+    public HandleSpec getOutputSpec() {
+        List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+        if (outputSpecs == null || outputSpecs.size() == 0) {
+            addHandleSpec(Handle.OUTPUT, new HandleSpec("stdout", 
PigStorage.class.getName()));
+        }
+        return getHandleSpecs(Handle.OUTPUT).get(0);
+    }
+    
+    /**
      * Get specifications for the given <code>Handle</code>.
      * 
      * @param handle <code>Handle</code> of the stream


Reply via email to