Author: olga
Date: Mon Sep 15 13:11:12 2008
New Revision: 695603

URL: http://svn.apache.org/viewvc?rev=695603&view=rev
Log:
streaming merge from trunk

Added:
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestBinaryStorage.java
Modified:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 Mon Sep 15 13:11:12 2008
@@ -30,8 +30,6 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.streaming.StreamingCommand.Handle;
-import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -45,8 +43,6 @@
     private String executableManagerStr;            // String representing 
ExecutableManager to use
     private ExecutableManager executableManager;    // ExecutableManager to 
use 
     private StreamingCommand command;               // Actual command to be run
-    private StreamingCommand originalCommand;       // Original command
-    private StreamingCommand optimizedCommand;      // Optimized command
     private Properties properties;
 
     private boolean initialized = false;
@@ -63,7 +59,6 @@
                       StreamingCommand command, Properties properties) {
         super(k);
         this.executableManagerStr = executableManager.getClass().getName();
-        this.originalCommand = command;
         this.command = command;
         this.properties = properties;
 
@@ -115,47 +110,6 @@
         return command;
     }
     
-    /**
-     * Set the optimized [EMAIL PROTECTED] HandleSpec} for the given [EMAIL 
PROTECTED] Handle} of the 
-     * <code>StreamSpec</code>.
-     * 
-     * @param handle <code>Handle</code> to optimize
-     * @param spec optimized specification for the handle
-     */ 
-    public void setOptimizedSpec(Handle handle, String spec) {
-        if (optimizedCommand == null) {
-            optimizedCommand = (StreamingCommand)command.clone();
-        }
-        
-        if (handle == Handle.INPUT) {
-            HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
-            streamInputSpec.setSpec(spec);
-        } else if (handle == Handle.OUTPUT) {
-            HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
-            streamOutputSpec.setSpec(spec);
-        }
-        
-        command = optimizedCommand;
-    }
-    
-    /**
-     * Revert the optimized [EMAIL PROTECTED] StreamingCommand} for this 
-     * <code>StreamSpec</code>.
-     */
-    public void revertOptimizedCommand(Handle handle) {
-        if (optimizedCommand == null) {
-            return;
-        }
-
-        if (handle == Handle.INPUT &&
-            !command.getInputSpec().equals(originalCommand.getInputSpec())) {
-            command.setInputSpec(originalCommand.getInputSpec());
-        } else if (handle == Handle.OUTPUT && 
-                   !command.getOutputSpec().equals(
-                           originalCommand.getOutputSpec())) {
-            command.setOutputSpec(originalCommand.getOutputSpec());
-        }
-    }
     
     /* (non-Javadoc)
      * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)

Modified: 
incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Mon 
Sep 15 13:11:12 2008
@@ -312,4 +312,7 @@
         }
         return baos.toByteArray();
     }
+    public boolean equals(Object obj) {
+        return true;
+    }
 }

Added: 
incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java?rev=695603&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java 
(added)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java 
Mon Sep 15 13:11:12 2008
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * [EMAIL PROTECTED] BinaryStorage} is a simple, as-is, 
serializer/deserializer pair.
+ * 
+ * It is [EMAIL PROTECTED] LoadFunc} which loads all the given data from the 
given 
+ * [EMAIL PROTECTED] InputStream} into a single [EMAIL PROTECTED] Tuple} and a 
[EMAIL PROTECTED] StoreFunc}
+ * which writes out all input data as a single <code>Tuple</code>. 
+ * 
+ * <code>BinaryStorage</code> is intended to work in cases where input files
+ * are to be sent in-whole for processing without any splitting and 
+ * interpretation of their data.
+ */
+public class BinaryStorage extends Utf8StorageConverter implements LoadFunc, 
StoreFunc {
+    // LoadFunc
+    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+
+    protected BufferedPositionedInputStream in = null;
+    protected long offset = 0;
+    protected long end = Long.MAX_VALUE;
+
+    // StoreFunc
+    OutputStream out;
+    
+    /**
+     * Create a <code>BinaryStorage</code> with default buffer size for reading
+     * inputs.
+     */
+    public BinaryStorage() {}
+    
+    /**
+     * Create a <code>BinaryStorage</code> with the given buffer-size for 
+     * reading inputs.
+     * 
+     * @param bufferSize buffer size to be used
+     */
+    public BinaryStorage(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+    
+    public void bindTo(String fileName, BufferedPositionedInputStream in,
+            long offset, long end) throws IOException {
+        this.in = in;
+        this.offset = offset;
+        this.end = end;
+    }
+
+    public Tuple getNext() throws IOException {
+        // Sanity check
+        if (in == null || in.getPosition() > end) {
+            return null;
+        }
+     
+        // Copy all data into the buffer
+        byte[] buffer = new byte[bufferSize];
+        int off = 0;
+        int len = bufferSize;
+        int n = 0;
+        while (len > 0 && (n = in.read(buffer, off, len)) != -1) {
+            off += n;
+            len -= n;
+        }
+        
+        if (n == -1) {
+            // Copy out the part-buffer and send it
+            byte[] copy = new byte[off];
+            System.arraycopy(buffer, 0, copy, 0, copy.length);
+            buffer = copy;
+        }
+
+        // Create a new Tuple with one DataAtom field and return it, 
+        // ensure that we return 'null' if we didn't get any data
+        if (off > 0) {
+            return DefaultTupleFactory.getInstance().newTuple(new 
DataByteArray(buffer));
+        }
+        
+        return null;
+    }
+
+    public void bindTo(OutputStream out) throws IOException {
+        this.out = out;
+    }
+
+    public void finish() throws IOException {}
+
+    public void putNext(Tuple f) throws IOException {
+        // Pick up the first field of the Tuple, then it's 
+        // raw-bytes and send it out
+        //DataAtom dAtom = (DataAtom)(f.getAtomField(0));
+        //byte[] data = dAtom.getValueBytes();
+        byte[] data;
+        try {
+            data = ((DataByteArray)f.get(0)).get();
+        } catch (ExecException e) {
+            IOException ioe = new IOException("Unable to get field out of 
tuple");
+            ioe.initCause(e);
+            throw ioe;
+        }
+        if (data.length > 0) {
+            out.write(data);
+            out.flush();
+        }
+    }
+    
+    public String toString() {
+        return "BinaryStorage(" + bufferSize + ")";
+    }
+    
+    public boolean equals(Object obj) {
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.net.URL)
+     */
+    public Schema determineSchema(URL fileName) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+     */
+    public void fieldsToRead(Schema schema) {
+        // TODO Auto-generated method stub
+        
+    }
+}

Modified: 
incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Mon 
Sep 15 13:11:12 2008
@@ -224,5 +224,14 @@
     public void fieldsToRead(Schema schema) {
         // do nothing
     }
+    
+    public boolean equals(Object obj) {
+        return equals((PigStorage)obj);
+    }
+
+    public boolean equals(PigStorage other) {
+        return this.fieldDel == other.fieldDel;
+    }
+
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Mon 
Sep 15 13:11:12 2008
@@ -135,6 +135,14 @@
         }
         
         executionEngine = null;
+        
+        // Add the default paths to be skipped for auto-shipping of commands
+        skippedShipPaths.add("/bin");
+        skippedShipPaths.add("/usr/bin");
+        skippedShipPaths.add("/usr/local/bin");
+        skippedShipPaths.add("/sbin");
+        skippedShipPaths.add("/usr/sbin");
+        skippedShipPaths.add("/usr/local/sbin");
     }
 
     static{

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java 
Mon Sep 15 13:11:12 2008
@@ -78,6 +78,25 @@
     public FileSpec getInputFile() {
         return mInputFileSpec;
     }
+    
+    
+    public void setInputFile(FileSpec inputFileSpec) throws IOException {
+       try { 
+            mLoadFunc = (LoadFunc)
+                 
PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec()); 
+       }catch (ClassCastException cce) {
+           log.error(inputFileSpec.getFuncSpec() + " should implement the 
LoadFunc interface.");
+           IOException ioe = new IOException(cce.getMessage()); 
+           ioe.setStackTrace(cce.getStackTrace());
+           throw ioe;
+       }
+        catch (Exception e){ 
+           IOException ioe = new IOException(e.getMessage()); 
+           ioe.setStackTrace(e.getStackTrace());
+           throw ioe; 
+       }
+        mInputFileSpec = inputFileSpec;
+    }
 
     public URL getSchemaFile() {
         return mSchemaFile;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
 Mon Sep 15 13:11:12 2008
@@ -81,8 +81,8 @@
             sb.append(depthFirst(leaf));
             sb.append("\n");
         }
-        sb.delete(sb.length() - "\n".length(), sb.length());
-        sb.delete(sb.length() - "\n".length(), sb.length());
+        //sb.delete(sb.length() - "\n".length(), sb.length());
+        //sb.delete(sb.length() - "\n".length(), sb.length());
         return sb.toString();
     }
     

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java 
Mon Sep 15 13:11:12 2008
@@ -71,6 +71,17 @@
     public FileSpec getOutputFile() {
         return mOutputFile;
     }
+    
+    public void setOutputFile(FileSpec outputFileSpec) throws IOException {
+        try { 
+            mStoreFunc = (StoreFunc) 
PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); 
+       } catch (Exception e) { 
+           IOException ioe = new IOException(e.getMessage()); 
+           ioe.setStackTrace(e.getStackTrace());
+           throw ioe; 
+       }
+       mOutputFile = outputFileSpec;
+    }
 
     public StoreFunc getStoreFunc() {
         return mStoreFunc;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java 
Mon Sep 15 13:11:12 2008
@@ -8,6 +8,8 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
  * [EMAIL PROTECTED] LOStream} represents the specification of an external
@@ -27,8 +29,7 @@
     private static final long serialVersionUID = 2L;
     // the StreamingCommand object for the
     // Stream Operator this operator represents
-    private StreamingCommand StrCmd;
-    //private LogicalOperator input;
+    private StreamingCommand command;
     private ExecutableManager executableManager;
     /**
      * Create a new <code>LOStream</code> with the given command.
@@ -41,7 +42,7 @@
     public LOStream(LogicalPlan plan, OperatorKey k, LogicalOperator input, 
ExecutableManager exeManager, StreamingCommand cmd) {
         super(plan, k);
         //this.input = input;
-        this.StrCmd = cmd;
+        this.command = cmd;
         this.executableManager = exeManager;
     }
     
@@ -52,7 +53,7 @@
      * @return the StreamingCommand object
      */
     public StreamingCommand getStreamingCommand() {
-        return StrCmd;
+        return command;
     }
     
     /* (non-Javadoc)
@@ -79,6 +80,45 @@
         */
 
     }
+    
+    /**
+     * Set the optimized [EMAIL PROTECTED] HandleSpec} for the given [EMAIL 
PROTECTED] Handle} of the 
+     * <code>StreamSpec</code>.
+     * 
+     * @param handle <code>Handle</code> to optimize
+     * @param spec optimized specification for the handle
+     */ 
+    public void setOptimizedSpec(Handle handle, String spec) {
+
+        // The reason we need to clone and optimize the clone is the following:
+        // consider a script like this:
+        // define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`;
+        // define CMD2 `cat`;
+        // A = load 'bla' split by 'file';
+        // B = stream A through CMD1;
+        // C = stream B through CMD1;
+        // D = stream C through CMD2;
+        // store D into 'bla';
+        // In this case CMD1 is represented by a single StreamingCommand Object
+        // which will be present as the "command" member in both the
+        // LOStream operators corresponding to B and C. However we want to
+        // optimize only B's input spec since it is immediately following a 
store
+        // and is conducive to optimization. At this point we clone and make
+        // sure only B's "command" gets optimized while C's "command" remains
+        // untouched.
+
+        StreamingCommand optimizedCommand = (StreamingCommand)command.clone();
+        
+        if (handle == Handle.INPUT) {
+            HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
+            streamInputSpec.setSpec(spec);
+        } else if (handle == Handle.OUTPUT) {
+            HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
+            streamOutputSpec.setSpec(spec);
+        }
+        
+        command = optimizedCommand;
+    }
 
     /* (non-Javadoc)
      * @see 
org.apache.pig.impl.logicalLayer.LogicalOperator#visit(org.apache.pig.impl.logicalLayer.LOVisitor)
@@ -93,7 +133,7 @@
      */
     @Override
     public String name() {
-        return "Stream (" + StrCmd.toString() + ") " + mKey.scope + "-" + 
mKey.id;
+        return "Stream (" + command.toString() + ") " + mKey.scope + "-" + 
mKey.id;
     }
 
     /* (non-Javadoc)

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=695603&r1=695602&r2=695603&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Mon Sep 15 13:11:12 2008
@@ -73,6 +73,13 @@
         mRules.add(new Rule(nodes, edges, required,
             new TypeCastInserter(plan, LOSTREAM_CLASSNAME)));
         
+        // Optimize when LOAD precedes STREAM and the loader class
+        // is the same as the serializer for the STREAM.
+        // Similarly optimize when STREAM is followed by store and the
+        // deserializer class is same as the Storage class.
+        mRules.add(new Rule(nodes, edges, required,
+            new StreamOptimizer(plan, LOSTREAM_CLASSNAME)));
+        
         // Push up limit where ever possible.
         nodes = new ArrayList<String>(1);
         edges = new HashMap<Integer, Integer>();

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java?rev=695603&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
 Mon Sep 15 13:11:12 2008
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.List;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.BinaryStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOStream;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * A visitor to optimize in the following scenario with
+ * streaming:
+ * Optimize when LOAD precedes STREAM and the loader class
+ * is the same as the serializer for the STREAM.
+ * Similarly optimize when STREAM is followed by store and the
+ * deserializer class is same as the Storage class.
+ * Specifically in both these cases the optimization is to
+ * replace the loader/serializer with BinaryStorage which 
+ * just moves bytes around and likewise replace the storer/deserializer
+ * with BinaryStorage
+ *
+ */
+public class StreamOptimizer extends LogicalTransformer {
+    
+    private boolean mOptimizeLoad = false;
+    private boolean mOptimizeStore = false;
+
+    public StreamOptimizer(LogicalPlan plan, String operatorClassName) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    @Override
+    public boolean check(List<LogicalOperator> nodes) throws 
OptimizerException {
+        mOptimizeLoad = false;
+        mOptimizeStore = false;
+        LogicalOperator lo = nodes.get(0);
+        if (lo == null || !(lo instanceof LOStream)) {
+            throw new RuntimeException("Expected stream, got " +
+                lo.getClass().getName());
+        }
+        LOStream stream = (LOStream)lo;
+        
+        // check if either the predecessor of stream is load with
+        // the same loader function as the serializer of stream
+        // OR if the successor of stream is store with the same
+        // storage function as the deserializer of stream.
+        checkLoadOptimizable(stream);
+        checkStoreOptimizable(stream);
+        return mOptimizeLoad || mOptimizeStore;
+    }
+    
+    private void checkLoadOptimizable(LOStream stream) {
+        LogicalOperator predecessor = mPlan.getPredecessors(stream).get(0);
+        if(predecessor instanceof LOLoad) {
+            LOLoad load = (LOLoad)predecessor;
+            if(!load.isSplittable()) {
+                // Try and optimize if the load and stream input specs match
+                // and input files are to be processed as-is
+                StreamingCommand command = stream.getStreamingCommand();
+                HandleSpec streamInputSpec = command.getInputSpec(); 
+                FileSpec loadFileSpec = load.getInputFile();
+                // Instantiate both LoadFunc objects to compare them for 
+                // equality
+                StoreFunc streamStorer = 
+                    (StoreFunc)PigContext.instantiateFuncFromSpec(
+                            streamInputSpec.getSpec());
+                LoadFunc inputLoader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(
+                                             loadFileSpec.getFuncSpec());
+                LogFactory.getLog(this.getClass()).info("streamStorer:" + 
streamStorer + "," +
+                        "inputLoader:" + inputLoader);
+                // Check if the streaming command's inputSpec also implements 
+                // LoadFunc and if it does, are they of the same _reversible_ 
+                // type?
+                boolean sameType = false;
+                try {
+                    // Check if the streamStorer is _reversible_ as 
+                    // the inputLoader ...
+                    if (streamStorer instanceof LoadFunc) {
+                        // Cast to check if they are of the same type...
+                        streamStorer.getClass().cast(inputLoader);
+                        
LogFactory.getLog(this.getClass()).info("streamStorer:" + streamStorer + "," +
+                                "inputLoader:" + inputLoader);
+                        // Now check if they both are reversible...
+                        if (streamStorer instanceof ReversibleLoadStoreFunc &&
+                            inputLoader instanceof ReversibleLoadStoreFunc) {
+                            sameType = true;
+                        }
+                    }
+                } catch (ClassCastException cce) {
+                    sameType = false;
+                }
+                // Check if both LoadFunc objects belong to the same type and
+                // are equivalent
+                if (sameType && streamStorer.equals(inputLoader)) {
+                    // Since they both are the same, we can flip them 
+                    // for BinaryStorage
+                    mOptimizeLoad = true;                    
+                }
+            }
+        }   
+    }
+    
+    private void checkStoreOptimizable(LOStream stream) {
+        List<LogicalOperator> successors = mPlan.getSuccessors(stream);
+        if(successors == null)
+            return;
+        LogicalOperator successor = successors.get(0);
+        if(successor instanceof LOStore) {
+            LOStore store = (LOStore)successor;
+            // Try and optimize if the store and stream output specs match
+            // and input files are to be processed as-is
+            StreamingCommand command = stream.getStreamingCommand();
+            HandleSpec streamOutputSpec = command.getOutputSpec(); 
+            
+            FileSpec storeFileSpec = store.getOutputFile();
+            
+            // Instantiate both to compare them for equality
+            LoadFunc streamLoader = 
+                (LoadFunc)PigContext.instantiateFuncFromSpec(
+                        streamOutputSpec.getSpec());
+            
+            StoreFunc outputStorer = 
(StoreFunc)PigContext.instantiateFuncFromSpec(
+                                         storeFileSpec.getFuncSpec());
+            
+            // Check if the streaming command's outputSpec also implements 
+            // StoreFunc and if it does, are they of the same _reversible_ 
+            // type?
+            boolean sameType = false;
+            try {
+                // Check if the streamLoader is _reversible_ as 
+                // the inputLoader ...
+                if (streamLoader instanceof StoreFunc) {
+                    // Cast to check if they are of the same type...
+                    streamLoader.getClass().cast(outputStorer);
+                    
+                    // Now check if they both are reversible...
+                    if (streamLoader instanceof ReversibleLoadStoreFunc &&
+                        outputStorer instanceof ReversibleLoadStoreFunc) {
+                        sameType = true;
+                    }
+                }
+            } catch (ClassCastException cce) {
+                sameType = false;
+            }
+            // Check if both LoadFunc objects belong to the same type and
+            // are equivalent
+            if (sameType && streamLoader.equals(outputStorer)) {
+                // Since they both are the same, we can flip them 
+                // for BinaryStorage
+                mOptimizeStore = true;                    
+            }
+        }
+    }
+    
+    @Override
+    public void transform(List<LogicalOperator> nodes) throws 
OptimizerException {
+        try {
+            LogicalOperator lo = nodes.get(0);
+            if (lo == null || !(lo instanceof LOStream)) {
+                throw new RuntimeException("Expected stream, got " +
+                    lo.getClass().getName());
+            }
+            LOStream stream = (LOStream)lo;
+            if(mOptimizeLoad) {
+                // we have verified in check() that the predecessor was load
+                LOLoad load = (LOLoad)mPlan.getPredecessors(lo).get(0);
+                FileSpec loadFileSpec = load.getInputFile();
+                load.setInputFile(new FileSpec(loadFileSpec.getFileName(), new 
FuncSpec(BinaryStorage.class.getName())));
+                stream.setOptimizedSpec(Handle.INPUT, 
BinaryStorage.class.getName());
+            }
+            if(mOptimizeStore) {
+                // we have verified in check() that the predecessor was load
+                LOStore store = (LOStore)mPlan.getSuccessors(lo).get(0);
+                FileSpec storeFileSpec = store.getOutputFile();
+                store.setOutputFile(new FileSpec(storeFileSpec.getFileName(), 
new FuncSpec(BinaryStorage.class.getName())));
+                stream.setOptimizedSpec(Handle.OUTPUT, 
BinaryStorage.class.getName());
+            }
+        } catch (Exception e) {
+            throw new OptimizerException(
+                "Unable to optimize load-stream-store optimization", e);
+        }
+    }
+}
+
+ 

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/TestBinaryStorage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBinaryStorage.java?rev=695603&view=auto
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestBinaryStorage.java 
(added)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestBinaryStorage.java 
Mon Sep 15 13:11:12 2008
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Test;
+
+
+public class TestBinaryStorage extends PigExecTestCase {
+    private static final String simpleEchoStreamingCommand = 
+        "perl -ne 'print \"$_\"'";
+
+    private static final int MAX_DATA_SIZE = 1024;
+    
+    @Test
+    public void testBinaryStorageWithAsciiData() throws Exception {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                              "A,5", "B,5", "C,8", "A,8",
+                              "D,8", "A,9"});
+        
+        // Test if data is handled correctly by BinaryStorage
+        testBinaryStorage(input);
+    }
+    
+    @Test
+    public void testBinaryStorageWithBinaryData() throws Exception {
+        // Create input file and fill it with random binary data
+        File input = File.createTempFile("tmp", "dat");
+        byte[] data = new byte[MAX_DATA_SIZE];
+        randomizeBytes(data, 0, data.length);
+        
+        FileOutputStream os = new FileOutputStream(input);
+        os.write(data, 0, data.length);
+        os.close();
+        
+        // Test if data is handled correctly by BinaryStorage
+        testBinaryStorage(input);
+    }
+    
+    private void testBinaryStorage(File input) 
+    throws Exception {
+
+        // Get input data 
+        byte[] inputData = new byte[MAX_DATA_SIZE];
+        int inputLen = 
+            readFileDataIntoBuffer(new FileInputStream(input), inputData);
+        
+        // Pig query to run
+        pigServer.registerQuery("DEFINE CMD `" + 
+                                simpleEchoStreamingCommand + "` " +
+                                "input(stdin using BinaryStorage()) " +
+                                "output(stdout using BinaryStorage());");
+        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                                "BinaryStorage() split by 'file';");
+        pigServer.registerQuery("OP = stream IP through CMD;");
+
+        // Save the output using BinaryStorage
+        String output = "./pig.BinaryStorage.out";
+        pigServer.store("OP", output, "BinaryStorage()");
+        
+        // Get output data 
+        InputStream out = FileLocalizer.open(output, 
pigServer.getPigContext());
+        byte[] outputData = new byte[MAX_DATA_SIZE];
+        int outputLen = readFileDataIntoBuffer(out, outputData);
+
+        // Check if the data is the same ...
+        assertEquals(true, 
+                WritableComparator.compareBytes(inputData, 0, inputLen, 
+                                                outputData, 0, outputLen) == 
0);
+        
+        // Cleanup
+        out.close();
+        pigServer.deleteFile(output);
+    }
+
+    private static void randomizeBytes(byte[] data, int offset, int length) {
+        Random random = new Random();
+        for(int i=offset + length - 1; i >= offset; --i) {
+            data[i] = (byte) random.nextInt(256);
+        }
+    }
+
+    private static int readFileDataIntoBuffer(InputStream is, byte[] buffer) 
+    throws IOException {
+        int n = 0;
+        int off = 0, len = buffer.length;
+        while (len > 0 && (n = is.read(buffer, off, len)) != -1) {
+            off += n;
+            len -= n;
+        }
+        return off;
+    }
+}


Reply via email to