Author: pradeepkth
Date: Thu Mar 5 19:30:47 2009
New Revision: 750559
URL: http://svn.apache.org/viewvc?rev=750559&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth) - adding new files and
removing a file which was supposed to be deleted
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
Removed:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=750559&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
Thu Mar 5 19:30:47 2009
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.data.Tuple;
+
+import java.text.NumberFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+
+/**
+ * This class is used to have a POStore write to DFS via a output
+ * collector/record writer. It sets up a modified job configuration to
+ * force a write to a specific subdirectory of the main output
+ * directory. This is done so that multiple output directories can be
+ * used in the same job. Since the hadoop framework requires a
+ * reporter to be available to create the record writer the main
+ * function (createStoreFunc) has to be called from within a map or
+ * reduce function.
+ */
+public class MapReducePOStoreImpl extends POStoreImpl {
+
+ private PigContext pc;
+ private StoreFunc storer;
+ private FileSpec sFile;
+ private Reporter reporter;
+ private RecordWriter writer;
+ private JobConf job;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ public MapReducePOStoreImpl(JobConf job) {
+ this.job = job;
+ }
+
+ public void setReporter(Reporter reporter) {
+ this.reporter = reporter;
+ }
+
+ @Override
+ public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+
+ // set up a new job conf
+ JobConf outputConf = new JobConf(job);
+ String tmpPath = PlanHelper.makeStoreTmpPath(sFile.getFileName());
+
+ // Right now we're always using PigOutputFormat.
+ outputConf.setOutputFormat(PigOutputFormat.class);
+
+ // PigOuputFormat will look for pig.storeFunc to actually
+ // write stuff out.
+ outputConf.set("pig.storeFunc", sFile.getFuncSpec().toString());
+
+ // We set the output dir to the final location of the output,
+ // the output dir set in the original job config points to the
+ // temp location for the multi store.
+ Path outputDir = new
Path(sFile.getFileName()).makeQualified(FileSystem.get(outputConf));
+ outputConf.set("mapred.output.dir", outputDir.toString());
+
+ // The workpath is set to a unique-per-store subdirectory of
+ // the current working directory.
+ String workPath = outputConf.get("mapred.work.output.dir");
+ outputConf.set("mapred.work.output.dir",
+ new Path(workPath, tmpPath).toString());
+ OutputFormat outputFormat = outputConf.getOutputFormat();
+
+ // Generate a unique part name (part-<task_partition_number>).
+ String fileName = getPartName(outputConf);
+
+ // create a new record writer
+ writer = outputFormat.getRecordWriter(FileSystem.get(outputConf),
+ outputConf, fileName, reporter);
+
+ // return an output collector using the writer we just created.
+ return new StoreFuncAdaptor(new OutputCollector()
+ {
+ @SuppressWarnings({"unchecked"})
+ public void collect(Object key, Object value) throws
IOException {
+ writer.write(key,value);
+ }
+ });
+ }
+
+ @Override
+ public void tearDown() throws IOException{
+ if (writer != null) {
+ writer.close(reporter);
+ writer = null;
+ }
+ }
+
+ @Override
+ public void cleanUp() throws IOException{
+ if (writer != null) {
+ writer.close(reporter);
+ writer = null;
+ }
+ }
+
+ private String getPartName(JobConf conf) {
+ int partition = conf.getInt("mapred.task.partition", -1);
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+
+ return "part-" + numberFormat.format(partition);
+ }
+
+ /**
+ * This is a simple adaptor class to allow the physical store operator
+ * to be used in the map reduce case. It will allow to use an output
+ * collector instead of an output stream to write tuples.
+ */
+ private class StoreFuncAdaptor implements StoreFunc {
+ private OutputCollector collector;
+
+ public StoreFuncAdaptor(OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void bindTo(OutputStream os) throws IOException {
+ }
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ collector.collect(null,f);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ }
+ }
+}
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=750559&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
Thu Mar 5 19:30:47 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.io.FileSpec;
+
+/**
+ * This class is used to specify the actual behavior of the store
+ * operator just when ready to start execution.
+ */
+public abstract class POStoreImpl {
+ /**
+ * Set up the storer
+ * @param sFile - The file the store should write to
+ * @throws IOException
+ */
+ public abstract StoreFunc createStoreFunc(FileSpec sFile) throws
IOException;
+
+ /**
+ * At the end of processing, the outputstream is closed
+ * using this method
+ * @throws IOException
+ */
+ public void tearDown() throws IOException{
+ }
+
+ /**
+ * To perform cleanup when there is an error.
+ * Uses the FileLocalizer method which only
+ * deletes the file but not the dirs created
+ * with it.
+ * @throws IOException
+ */
+ public void cleanUp() throws IOException{
+ }
+}
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=750559&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
Thu Mar 5 19:30:47 2009
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
+
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import java.net.URI;
+
+/**
+ * Utility class with a few helper functions to deal with physical plans.
+ */
+public class PlanHelper {
+
+ private final static Log log = LogFactory.getLog(new
PlanHelper().getClass());
+
+ private PlanHelper() {}
+
+ /**
+ * Get all the store operators in the plan
+ * @param plan
+ * @return List of stores (could be empty)
+ */
+ public static List<POStore> getStores(PhysicalPlan plan) {
+ List<POStore> stores = new LinkedList<POStore>();
+ List<PhysicalOperator> leaves = plan.getLeaves();
+ for (PhysicalOperator leaf: leaves) {
+ if (leaf instanceof POStore) {
+ stores.add((POStore)leaf);
+ }
+ }
+ return stores;
+ }
+
+ /**
+ * Get all the load operators in the plan
+ * @param plan
+ * @return List of loads (could be empty)
+ */
+ public static List<POLoad> getLoads(PhysicalPlan plan) {
+ List<POLoad> loads = new LinkedList<POLoad>();
+ List<PhysicalOperator> roots = plan.getRoots();
+ for (PhysicalOperator root: roots) {
+ if (root instanceof POLoad) {
+ loads.add((POLoad)root);
+ }
+ }
+ return loads;
+ }
+
+ /**
+ * Creates a relative path that can be used to build a temporary
+ * place to store the output from a number of map-reduce tasks.
+ */
+ public static String makeStoreTmpPath(String orig) {
+ Path path = new Path(orig);
+ URI uri = path.toUri();
+ uri.normalize();
+
+ String pathStr = uri.getPath();
+ if (path.isAbsolute()) {
+ return new Path("abs"+pathStr).toString();
+ } else {
+ return new Path("rel/"+pathStr).toString();
+ }
+ }
+}
\ No newline at end of file
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java?rev=750559&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
Thu Mar 5 19:30:47 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+
+/**
+ * This class implements the behavior of a POStore in the local
+ * execution engine. It creates and manages the store function and the
+ * output stream of the store.
+ */
+public class LocalPOStoreImpl extends POStoreImpl {
+
+ private OutputStream os;
+ private PigContext pc;
+ private StoreFunc storer;
+ private FileSpec sFile;
+
+ public LocalPOStoreImpl(PigContext pc) {
+ this.pc = pc;
+ }
+
+ @Override
+ public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+ this.sFile = sFile;
+ storer =
(StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+ os = FileLocalizer.create(sFile.getFileName(), pc);
+ storer.bindTo(os);
+ return storer;
+ }
+
+ @Override
+ public void tearDown() throws IOException{
+ storer.finish();
+ os.close();
+ }
+
+ @Override
+ public void cleanUp() throws IOException{
+ String fName = sFile.getFileName();
+ os.flush();
+ if(FileLocalizer.fileExists(fName,pc)) {
+ FileLocalizer.delete(fName,pc);
+ }
+ }
+}