Author: olga
Date: Thu Sep  4 14:41:21 2008
New Revision: 692261

URL: http://svn.apache.org/viewvc?rev=692261&view=rev
Log:
missing files from stream merge

Added:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java?rev=692261&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
 Thu Sep  4 14:41:21 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper
+ *  - If the map plan or the reduce plan of the MROper has
+ *  a POStream in it, this marks in the MROper whether the map 
+ * has a POStream or if the reduce has a POStream.
+ *  
+ */
+public class MRStreamHandler extends MROpPlanVisitor {
+
+    /**
+     * @param plan MR plan to visit
+     */
+    public MRStreamHandler(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        
+        StreamChecker checker = new StreamChecker(mr.mapPlan);
+        checker.visit();
+        if(checker.isStreamPresent()) {
+            mr.setStreamInMap(true);            
+        }
+        
+        checker = new StreamChecker(mr.reducePlan);
+        checker.visit();
+        if(checker.isStreamPresent()) {
+            mr.setStreamInReduce(true);            
+        }      
+        
+    }
+
+    class StreamChecker extends PhyPlanVisitor {
+        
+        private boolean streamPresent = false;
+        public StreamChecker(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        }
+        
+        /* (non-Javadoc)
+         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+         */
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            // stream present
+            streamPresent = true;
+        }
+
+        /**
+         * @return if stream is present
+         */
+        public boolean isStreamPresent() {
+            return streamPresent;
+        }
+    }
+}
+

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=692261&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java 
(added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java 
Thu Sep  4 14:41:21 2008
@@ -0,0 +1,114 @@
+/**
+ * 
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+/**
+ * [EMAIL PROTECTED] LOStream} represents the specification of an external
+ * command to be executed in a Pig Query. 
+ * 
+ * <code>LOStream</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class LOStream extends LogicalOperator {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+    // the StreamingCommand object for the
+    // Stream Operator this operator represents
+    private StreamingCommand StrCmd;
+    //private LogicalOperator input;
+    private ExecutableManager executableManager;
+    /**
+     * Create a new <code>LOStream</code> with the given command.
+     * 
+     * @param plan the logical plan this operator is a part of
+     * @param k the operator key for this operator
+     * @param pigContext the pigContext object
+     * @param argv parsed arguments of the <code>command</code>
+     */
+    public LOStream(LogicalPlan plan, OperatorKey k, LogicalOperator input, 
ExecutableManager exeManager, StreamingCommand cmd) {
+        super(plan, k);
+        //this.input = input;
+        this.StrCmd = cmd;
+        this.executableManager = exeManager;
+    }
+    
+    /**
+     * Get the StreamingCommand object associated
+     * with this operator
+     * 
+     * @return the StreamingCommand object
+     */
+    public StreamingCommand getStreamingCommand() {
+        return StrCmd;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LogicalOperator#getSchema()
+     */
+    @Override
+    public Schema getSchema() throws FrontendException {
+        return mSchema;
+        /*
+        if (!mIsSchemaComputed) {
+            /*
+            LogicalOperator input = mPlan.getPredecessors(this).get(0);
+            ArrayList<Schema.FieldSchema> fss = new 
ArrayList<Schema.FieldSchema>();
+            try {
+                mSchema = input.getSchema();
+                mIsSchemaComputed = true;
+            } catch (FrontendException ioe) {
+                mSchema = null;
+                mIsSchemaComputed = false;
+                throw ioe;
+            }
+        }
+        return mSchema;
+        */
+
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.impl.logicalLayer.LogicalOperator#visit(org.apache.pig.impl.logicalLayer.LOVisitor)
+     */
+    @Override
+    public void visit(LOVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#name()
+     */
+    @Override
+    public String name() {
+        return "Stream (" + StrCmd.toString() + ") " + mKey.scope + "-" + 
mKey.id;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs()
+     */
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    /**
+     * @return the ExecutableManager
+     */
+    public ExecutableManager getExecutableManager() {
+        return executableManager;
+    }
+
+}


Reply via email to