Author: pradeepkth
Date: Thu Mar  5 19:30:47 2009
New Revision: 750559

URL: http://svn.apache.org/viewvc?rev=750559&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth) - adding new files and 
removing a file which was supposed to be deleted

Added:
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
Removed:
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=750559&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 Thu Mar  5 19:30:47 2009
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.data.Tuple;
+
+import java.text.NumberFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+
+/**
+ * This class is used to have a POStore write to DFS via a output
+ * collector/record writer. It sets up a modified job configuration to
+ * force a write to a specific subdirectory of the main output
+ * directory. This is done so that multiple output directories can be
+ * used in the same job. Since the hadoop framework requires a
+ * reporter to be available to create the record writer the main
+ * function (createStoreFunc) has to be called from within a map or
+ * reduce function.
+ */
+public class MapReducePOStoreImpl extends POStoreImpl {
+
+    private PigContext pc;
+    private StoreFunc storer;
+    private FileSpec sFile;
+    private Reporter reporter;
+    private RecordWriter writer;
+    private JobConf job;
+
+    private final Log log = LogFactory.getLog(getClass());
+    
+    public MapReducePOStoreImpl(JobConf job) {
+        this.job = job;
+    }
+
+    public void setReporter(Reporter reporter) {
+        this.reporter = reporter;
+    }
+
+    @Override
+    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+
+        // set up a new job conf
+        JobConf outputConf = new JobConf(job);
+        String tmpPath = PlanHelper.makeStoreTmpPath(sFile.getFileName());
+
+        // Right now we're always using PigOutputFormat.
+        outputConf.setOutputFormat(PigOutputFormat.class);
+
+        // PigOuputFormat will look for pig.storeFunc to actually
+        // write stuff out.
+        outputConf.set("pig.storeFunc", sFile.getFuncSpec().toString());
+
+        // We set the output dir to the final location of the output,
+        // the output dir set in the original job config points to the
+        // temp location for the multi store.
+        Path outputDir = new 
Path(sFile.getFileName()).makeQualified(FileSystem.get(outputConf));
+        outputConf.set("mapred.output.dir", outputDir.toString());
+
+        // The workpath is set to a unique-per-store subdirectory of
+        // the current working directory.
+        String workPath = outputConf.get("mapred.work.output.dir");
+        outputConf.set("mapred.work.output.dir",
+                       new Path(workPath, tmpPath).toString());
+        OutputFormat outputFormat = outputConf.getOutputFormat();
+
+        // Generate a unique part name (part-<task_partition_number>).
+        String fileName = getPartName(outputConf);
+        
+        // create a new record writer
+        writer = outputFormat.getRecordWriter(FileSystem.get(outputConf), 
+                                              outputConf, fileName, reporter);
+
+        // return an output collector using the writer we just created.
+        return new StoreFuncAdaptor(new OutputCollector() 
+            {
+                @SuppressWarnings({"unchecked"})
+                public void collect(Object key, Object value) throws 
IOException {
+                    writer.write(key,value);
+                }
+            });
+    }
+
+    @Override
+    public void tearDown() throws IOException{
+        if (writer != null) {
+            writer.close(reporter);
+            writer = null;
+        }
+    }
+
+    @Override
+    public void cleanUp() throws IOException{
+        if (writer != null) {
+            writer.close(reporter);
+            writer = null;
+        }
+    }
+
+    private String getPartName(JobConf conf) {
+        int partition = conf.getInt("mapred.task.partition", -1);   
+
+        NumberFormat numberFormat = NumberFormat.getInstance();
+        numberFormat.setMinimumIntegerDigits(5);
+        numberFormat.setGroupingUsed(false);
+
+        return "part-" + numberFormat.format(partition);
+    }
+
+    /**
+     * This is a simple adaptor class to allow the physical store operator
+     * to be used in the map reduce case. It will allow to use an output
+     * collector instead of an output stream to write tuples.
+     */
+    private class StoreFuncAdaptor implements StoreFunc {
+        private OutputCollector collector;
+        
+        public StoreFuncAdaptor(OutputCollector collector) {
+            this.collector = collector;
+        }
+        
+        @Override
+        public void bindTo(OutputStream os) throws IOException {
+        }
+        
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            collector.collect(null,f);
+        }
+        
+        @Override
+        public void finish() throws IOException {
+        }
+    }
+}

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=750559&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
 Thu Mar  5 19:30:47 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.io.FileSpec;
+
+/**
+ * This class is used to specify the actual behavior of the store
+ * operator just when ready to start execution.
+ */
+public abstract class POStoreImpl {
+    /**
+     * Set up the storer 
+     * @param sFile - The file the store should write to
+     * @throws IOException
+     */
+    public abstract StoreFunc createStoreFunc(FileSpec sFile) throws 
IOException;
+    
+    /**
+     * At the end of processing, the outputstream is closed
+     * using this method
+     * @throws IOException
+     */
+    public void tearDown() throws IOException{
+    }
+    
+    /**
+     * To perform cleanup when there is an error.
+     * Uses the FileLocalizer method which only 
+     * deletes the file but not the dirs created
+     * with it.
+     * @throws IOException
+     */
+    public void cleanUp() throws IOException{
+    }
+}

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=750559&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 Thu Mar  5 19:30:47 2009
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
+
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import java.net.URI;
+
+/**
+ * Utility class with a few helper functions to deal with physical plans.
+ */
+public class PlanHelper {
+
+    private final static Log log = LogFactory.getLog(new 
PlanHelper().getClass());
+    
+    private PlanHelper() {}
+
+    /**
+     * Get all the store operators in the plan
+     * @param plan
+     * @return List of stores (could be empty)
+     */
+    public static List<POStore> getStores(PhysicalPlan plan) {
+        List<POStore> stores = new LinkedList<POStore>();
+        List<PhysicalOperator> leaves = plan.getLeaves();
+        for (PhysicalOperator leaf: leaves) {
+            if (leaf instanceof POStore) {
+                stores.add((POStore)leaf);
+            }
+        }
+        return stores;
+    }
+
+    /**
+     * Get all the load operators in the plan
+     * @param plan
+     * @return List of loads (could be empty)
+     */
+    public static List<POLoad> getLoads(PhysicalPlan plan) {
+        List<POLoad> loads = new LinkedList<POLoad>();
+        List<PhysicalOperator> roots = plan.getRoots();
+        for (PhysicalOperator root: roots) {
+            if (root instanceof POLoad) {
+                loads.add((POLoad)root);
+            }
+        }
+        return loads;
+    }
+
+    /**
+     * Creates a relative path that can be used to build a temporary
+     * place to store the output from a number of map-reduce tasks.
+     */
+    public static String makeStoreTmpPath(String orig) {
+        Path path = new Path(orig);
+        URI uri = path.toUri();
+        uri.normalize();
+
+        String pathStr = uri.getPath();
+        if (path.isAbsolute()) {
+            return new Path("abs"+pathStr).toString();
+        } else {
+            return new Path("rel/"+pathStr).toString();
+        }
+    }
+}
\ No newline at end of file

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java?rev=750559&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
 Thu Mar  5 19:30:47 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+
+/**
+ * This class implements the behavior of a POStore in the local
+ * execution engine. It creates and manages the store function and the
+ * output stream of the store.
+ */
+public class LocalPOStoreImpl extends POStoreImpl {
+
+    private OutputStream os;
+    private PigContext pc;
+    private StoreFunc storer;
+    private FileSpec sFile;
+    
+    public LocalPOStoreImpl(PigContext pc) {
+        this.pc = pc;
+    }
+
+    @Override
+    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+        this.sFile = sFile;
+        storer = 
(StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        os = FileLocalizer.create(sFile.getFileName(), pc);
+        storer.bindTo(os);
+        return storer;
+    }
+
+    @Override
+    public void tearDown() throws IOException{
+        storer.finish();
+        os.close();
+    }
+
+    @Override
+    public void cleanUp() throws IOException{
+        String fName = sFile.getFileName();
+        os.flush();
+        if(FileLocalizer.fileExists(fName,pc)) {
+            FileLocalizer.delete(fName,pc);
+        }
+    }
+}


Reply via email to