Author: olga Date: Thu Sep 4 14:41:21 2008 New Revision: 692261 URL: http://svn.apache.org/viewvc?rev=692261&view=rev Log: missing files from stream merge
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java?rev=692261&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java Thu Sep 4 14:41:21 2008 @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans; + +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +/** + * This visitor visits the MRPlan and does the following + * for each MROper + * - If the map plan or the reduce plan of the MROper has + * a POStream in it, this marks in the MROper whether the map + * has a POStream or if the reduce has a POStream. + * + */ +public class MRStreamHandler extends MROpPlanVisitor { + + /** + * @param plan MR plan to visit + */ + public MRStreamHandler(MROperPlan plan) { + super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); + } + + @Override + public void visitMROp(MapReduceOper mr) throws VisitorException { + + StreamChecker checker = new StreamChecker(mr.mapPlan); + checker.visit(); + if(checker.isStreamPresent()) { + mr.setStreamInMap(true); + } + + checker = new StreamChecker(mr.reducePlan); + checker.visit(); + if(checker.isStreamPresent()) { + mr.setStreamInReduce(true); + } + + } + + class StreamChecker extends PhyPlanVisitor { + + private boolean streamPresent = false; + public StreamChecker(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); + } + + /* (non-Javadoc) + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream) + */ + @Override + public void visitStream(POStream stream) throws VisitorException { + // stream present + streamPresent = true; + } + + /** + * @return if stream is present + */ + public boolean isStreamPresent() { + return streamPresent; + } + } +} + Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=692261&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java Thu Sep 4 14:41:21 2008 @@ -0,0 +1,114 @@ +/** + * + */ +package org.apache.pig.impl.logicalLayer; + +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.impl.streaming.StreamingCommand; + +/** + * [EMAIL PROTECTED] LOStream} represents the specification of an external + * command to be executed in a Pig Query. + * + * <code>LOStream</code> encapsulates all relevant details of the + * command specified by the user either directly via the <code>STREAM</code> + * operator or indirectly via a <code>DEFINE</code> operator. It includes + * details such as input/output/error specifications and also files to be + * shipped to the cluster and files to be cached. + */ +public class LOStream extends LogicalOperator { + + /** + * + */ + private static final long serialVersionUID = 2L; + // the StreamingCommand object for the + // Stream Operator this operator represents + private StreamingCommand StrCmd; + //private LogicalOperator input; + private ExecutableManager executableManager; + /** + * Create a new <code>LOStream</code> with the given command. + * + * @param plan the logical plan this operator is a part of + * @param k the operator key for this operator + * @param pigContext the pigContext object + * @param argv parsed arguments of the <code>command</code> + */ + public LOStream(LogicalPlan plan, OperatorKey k, LogicalOperator input, ExecutableManager exeManager, StreamingCommand cmd) { + super(plan, k); + //this.input = input; + this.StrCmd = cmd; + this.executableManager = exeManager; + } + + /** + * Get the StreamingCommand object associated + * with this operator + * + * @return the StreamingCommand object + */ + public StreamingCommand getStreamingCommand() { + return StrCmd; + } + + /* (non-Javadoc) + * @see org.apache.pig.impl.logicalLayer.LogicalOperator#getSchema() + */ + @Override + public Schema getSchema() throws FrontendException { + return mSchema; + /* + if (!mIsSchemaComputed) { + /* + LogicalOperator input = mPlan.getPredecessors(this).get(0); + ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(); + try { + mSchema = input.getSchema(); + mIsSchemaComputed = true; + } catch (FrontendException ioe) { + mSchema = null; + mIsSchemaComputed = false; + throw ioe; + } + } + return mSchema; + */ + + } + + /* (non-Javadoc) + * @see org.apache.pig.impl.logicalLayer.LogicalOperator#visit(org.apache.pig.impl.logicalLayer.LOVisitor) + */ + @Override + public void visit(LOVisitor v) throws VisitorException { + v.visit(this); + } + + /* (non-Javadoc) + * @see org.apache.pig.impl.plan.Operator#name() + */ + @Override + public String name() { + return "Stream (" + StrCmd.toString() + ") " + mKey.scope + "-" + mKey.id; + } + + /* (non-Javadoc) + * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs() + */ + @Override + public boolean supportsMultipleInputs() { + return false; + } + + /** + * @return the ExecutableManager + */ + public ExecutableManager getExecutableManager() { + return executableManager; + } + +}