Author: olga
Date: Wed Jun 25 16:18:22 2008
New Revision: 671681

URL: http://svn.apache.org/viewvc?rev=671681&view=rev
Log:
PIG-272: streaming with intermediate store is broken

Modified:
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 Wed Jun 25 16:18:22 2008
@@ -194,12 +194,7 @@
     private void writeDebugHeader() {
         processError("===== Task Information Header =====" );
 
-        StringBuffer sb = new StringBuffer();
-        for (String arg : command.getCommandArgs()) {
-            sb.append(arg);
-            sb.append(" ");
-        }
-        processError("\nCommand: " + sb.toString());
+        processError("\nCommand: " + command);
         processError("\nStart time: " + new Date(System.currentTimeMillis()));
         if (job.getBoolean("mapred.task.is.map", false)) {
             processError("\nInput-split file: " + job.get("map.input.file"));

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Wed Jun 25 
16:18:22 2008
@@ -28,6 +28,8 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.data.Datum;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -40,10 +42,13 @@
 
     private String executableManager;               // ExecutableManager to use
     private StreamingCommand command;               // Actual command to be run
+    private StreamingCommand originalCommand;       // Original command
+    private StreamingCommand optimizedCommand;      // Optimized command
 
     public StreamSpec(ExecutableManager executableManager, 
                       StreamingCommand command) {
         this.executableManager = executableManager.getClass().getName();
+        this.originalCommand = command;
         this.command = command;
 
         // Setup streaming-specific properties
@@ -75,12 +80,54 @@
 
     /**
      * Get the [EMAIL PROTECTED] StreamingCommand} for this 
<code>StreamSpec</code>.
-     * @return
+     * @return the [EMAIL PROTECTED] StreamingCommand} for this 
<code>StreamSpec</code>
      */
     public StreamingCommand getCommand() {
         return command;
     }
     
+    /**
+     * Set the optimized [EMAIL PROTECTED] HandleSpec} for the given [EMAIL 
PROTECTED] Handle} of the 
+     * <code>StreamSpec</code>.
+     * 
+     * @param handle <code>Handle</code> to optimize
+     * @param spec optimized specification for the handle
+     */ 
+    public void setOptimizedSpec(Handle handle, String spec) {
+        if (optimizedCommand == null) {
+            optimizedCommand = (StreamingCommand)command.clone();
+        }
+        
+        if (handle == Handle.INPUT) {
+            HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
+            streamInputSpec.setSpec(spec);
+        } else if (handle == Handle.OUTPUT) {
+            HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
+            streamOutputSpec.setSpec(spec);
+        }
+        
+        command = optimizedCommand;
+    }
+    
+    /**
+     * Revert the optimized [EMAIL PROTECTED] StreamingCommand} for this 
+     * <code>StreamSpec</code>.
+     */
+    public void revertOptimizedCommand(Handle handle) {
+        if (optimizedCommand == null) {
+            return;
+        }
+
+        if (handle == Handle.INPUT &&
+            !command.getInputSpec().equals(originalCommand.getInputSpec())) {
+            command.setInputSpec(originalCommand.getInputSpec());
+        } else if (handle == Handle.OUTPUT && 
+                   !command.getOutputSpec().equals(
+                           originalCommand.getOutputSpec())) {
+            command.setOutputSpec(originalCommand.getOutputSpec());
+        }
+    }
+    
     public List<String> getFuncs() {
         // No user-defined functions here
         return new ArrayList<String>();
@@ -101,6 +148,10 @@
         v.visitStream(this);
     }
 
+    public String toString() {
+      return command.toString();
+    }
+    
     /**
      * A simple [EMAIL PROTECTED] DataCollector} which wraps a [EMAIL 
PROTECTED] ExecutableManager}
      * and lets it handle the input and the output to the managed executable.

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
 Wed Jun 25 16:18:22 2008
@@ -37,6 +37,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
@@ -107,13 +108,16 @@
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     load.setInputFileSpec(new 
FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
-                    
-                    streamInputSpec.setSpec(BinaryStorage.class.getName());
-                    command.setInputSpec(streamInputSpec);
-                    
+                    streamSpec.setOptimizedSpec(Handle.INPUT, 
+                                                   
BinaryStorage.class.getName());
                     optimize = true;
                 }
             }
+        } else {
+            if (e.getSpec() instanceof StreamSpec) {
+                StreamSpec streamSpec = (StreamSpec)e.getSpec();
+                streamSpec.revertOptimizedCommand(Handle.INPUT);
+            }
         }
         
         parentLoad = false;

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
 Wed Jun 25 16:18:22 2008
@@ -37,6 +37,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
@@ -62,6 +63,11 @@
         super.visitEval(e);
         eval = e;
         parentEval = true;
+
+        if (e.getSpec() instanceof StreamSpec) {
+            StreamSpec streamSpec = (StreamSpec)e.getSpec();
+            streamSpec.revertOptimizedCommand(Handle.OUTPUT);
+        }
     }
 
     public void visitLoad(LOLoad load) {
@@ -133,10 +139,8 @@
                     // Since they both are the same, we can flip them 
                     // for BinaryStorage
                     s.setOutputFileSpec(new 
FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
-                    
-                    streamOutputSpec.setSpec(BinaryStorage.class.getName());
-                    command.setOutputSpec(streamOutputSpec);
-                    
+                    streamSpec.setOptimizedSpec(Handle.OUTPUT, 
+                                                   
BinaryStorage.class.getName());
                     optimize = true;
                 }
             }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java 
Wed Jun 25 16:18:22 2008
@@ -5,6 +5,8 @@
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -25,7 +27,7 @@
  * details such as input/output/error specifications and also files to be
  * shipped to the cluster and files to be cached.
  */
-public class StreamingCommand implements Serializable {
+public class StreamingCommand implements Serializable, Cloneable {
     private static final long serialVersionUID = 1L;
 
     // External command to be executed and it's parsed components
@@ -360,9 +362,40 @@
     }
 
     public String toString() {
-        return executable;
+        StringBuffer sb = new StringBuffer();
+        for (String arg : getCommandArgs()) {
+            sb.append(arg);
+            sb.append(" ");
+        }
+        sb.append("(" + getInputSpec().toString() + "/"+getOutputSpec() + ")");
+
+        return sb.toString();
     }
     
+    public Object clone() {
+      try {
+        StreamingCommand clone = (StreamingCommand)super.clone();
+
+        clone.shipSpec = new ArrayList<String>(shipSpec);
+        clone.cacheSpec = new ArrayList<String>(cacheSpec);
+        
+        clone.handleSpecs = new HashMap<Handle, List<HandleSpec>>();
+        for (Map.Entry<Handle, List<HandleSpec>> e : handleSpecs.entrySet()) {
+          List<HandleSpec> values = new ArrayList<HandleSpec>();
+          for (HandleSpec spec : e.getValue()) {
+            values.add((HandleSpec)spec.clone());
+          }
+          clone.handleSpecs.put(e.getKey(), values);
+        }
+
+        return clone;
+      } catch (CloneNotSupportedException cnse) {
+        // Shouldn't happen since we do implement Clonable
+        throw new InternalError(cnse.toString());
+      }
+    }
+
+
     /**
      * Specification about the usage of the [EMAIL PROTECTED] Handle} to 
communicate
      * with the external process.
@@ -373,7 +406,7 @@
      * to/from the stream.
      */
     public static class HandleSpec 
-    implements Comparable<HandleSpec>, Serializable {
+    implements Comparable<HandleSpec>, Serializable, Cloneable {
         private static final long serialVersionUID = 1L;
 
         String name;
@@ -408,7 +441,7 @@
         }
         
         public String toString() {
-            return name + " using " + spec;
+            return name + "-" + spec;
         }
 
         /**
@@ -450,5 +483,20 @@
         public void setSpec(String spec) {
             this.spec = spec;
         }
+        
+        public boolean equals(Object obj) {
+          HandleSpec other = (HandleSpec)obj;
+          return (name.equals(other.name) && spec.equals(other.spec));
+        }
+
+
+        public Object clone() {
+          try {
+            return super.clone();
+          } catch (CloneNotSupportedException cnse) {
+            // Shouldn't happen since we do implement Clonable
+            throw new InternalError(cnse.toString());
+          }
+        }
     }
 }
\ No newline at end of file


Reply via email to