Author: pradeepkth
Date: Fri Oct 30 20:40:31 2009
New Revision: 831449
URL: http://svn.apache.org/viewvc?rev=831449&view=rev
Log:
PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
StoreFunc in the multistore case (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 20:40:31 2009
@@ -107,6 +107,9 @@
BUG FIXES
+PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
+StoreFunc in the multistore case (pradeepkth)
+
PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
should never be serialized (rding via pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
Fri Oct 30 20:40:31 2009
@@ -19,7 +19,10 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -31,12 +34,17 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.tools.bzip2r.CBZip2OutputStream;
/**
@@ -88,9 +96,79 @@
return new PigRecordWriter(fs, new Path(outputDir, name), store);
}
+ @SuppressWarnings("deprecation")
public void checkOutputSpecs(FileSystem fs, JobConf job) throws
IOException {
- // TODO We really should validate things here
- return;
+
+ // check if there is any storeFunc which internally has an
+ // OutputFormat - if it does, we should be delegating this call
+ // to it after setting up the StoreFunc and StoreConfig properties
+ // in the JobConf.
+ PhysicalPlan mp = (PhysicalPlan) ObjectSerializer.deserialize(
+ job.get("pig.mapPlan"));
+ List<POStore> mapStores = PlanHelper.getStores(mp);
+ PhysicalPlan rp = (PhysicalPlan) ObjectSerializer.deserialize(
+ job.get("pig.reducePlan"));
+ List<POStore> reduceStores = new ArrayList<POStore>();
+ if(rp != null) {
+ reduceStores = PlanHelper.getStores(rp);
+ }
+
+ // In the case of single store in the job, we remove the store
+ // out of the map/reduce plan and in that case, if the store had
+ // an OutputFormat, we would have set that to be the Job's
+ // OutputFormat (relevant code in JobControlCompiler). We only need
+ // to handle multi store case - to be safe, we check for non zero
+ // store size
+ if(mapStores.size() > 0) {
+ for (POStore store : mapStores) {
+ checkOutputSpecsHelper(fs, store, job);
+ }
+ }
+ if(reduceStores.size() > 0) {
+ for (POStore store : reduceStores) {
+ checkOutputSpecsHelper(fs, store, job);
+ }
+ }
+ }
+
+ /**
+ * @param fs
+ * @param store
+ * @param job
+ * @throws IOException
+ */
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private void checkOutputSpecsHelper(FileSystem fs, POStore store, JobConf
job)
+ throws IOException {
+ StoreFunc storeFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(
+ store.getSFile().getFuncSpec());
+ Class sPrepClass = null;
+ try {
+ sPrepClass = storeFunc.getStorePreparationClass();
+ } catch(AbstractMethodError e) {
+ // this is for backward compatibility wherein some old StoreFunc
+ // which does not implement getStorePreparationClass() is being
+ // used. In this case, we want to just use PigOutputFormat
+ sPrepClass = null;
+ }
+ if(sPrepClass != null &&
OutputFormat.class.isAssignableFrom(sPrepClass)) {
+
+ StoreConfig storeConfig = new StoreConfig(store.getSFile().
+ getFileName(), store.getSchema(), store.getSortInfo());
+ // make a copy of the conf since we may be dealing with multiple
+ // stores. Set storeFunc and StoreConfig
+ // pertaining to this store in the copy and use it
+ JobConf confCopy = new JobConf(job);
+ confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+ store.getSFile().getFuncSpec().toString()));
+ confCopy.set(JobControlCompiler.PIG_STORE_CONFIG,
+ ObjectSerializer.serialize(storeConfig));
+ confCopy.setOutputFormat(sPrepClass);
+ OutputFormat of = confCopy.getOutputFormat();
+ of.checkOutputSpecs(fs, confCopy);
+
+ }
+
}
static public class PigRecordWriter implements
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri Oct 30
20:40:31 2009
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
@@ -36,6 +37,8 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.executionengine.ExecJob;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
@@ -45,6 +48,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -59,8 +63,14 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.util.Progressable;
public class TestMultiQuery extends TestCase {
@@ -1904,7 +1914,117 @@
Assert.fail();
}
}
+
+ /**
+ * Test that pig calls checkOutputSpecs() method of the OutputFormat (if
the
+ * StoreFunc defines an OutputFormat as the return value of
+ * {...@link StoreFunc#getStorePreparationClass()}
+ * @throws IOException
+ */
+ @Test
+ public void testMultiStoreWithOutputFormat() throws IOException {
+ Util.createInputFile(cluster, "input.txt", new String[] {"hello",
"bye"});
+ String query = "a = load 'input.txt';" +
+ "b = filter a by $0 < 10;" +
+ "store b into 'output1' using
"+DummyStoreWithOutputFormat.class.getName()+"();" +
+ "c = group a by $0;" +
+ "d = foreach c generate group, COUNT(a.$0);" +
+ "store d into 'output2' using
"+DummyStoreWithOutputFormat.class.getName()+"();" ;
+ myPig.setBatchOn();
+ Util.registerMultiLineQuery(myPig, query);
+ myPig.executeBatch();
+
+ // check that files were created as a result of the
+ // checkOutputSpecs() method of the OutputFormat being called
+ FileSystem fs = cluster.getFileSystem();
+ assertEquals(true, fs.exists(new
Path("output1_checkOutputSpec_test")));
+ assertEquals(true, fs.exists(new
Path("output2_checkOutputSpec_test")));
+ Util.deleteFile(cluster, "input.txt");
+ Util.deleteFile(cluster, "output1_checkOutputSpec_test");
+ Util.deleteFile(cluster, "output2_checkOutputSpec_test");
+ }
+
+ public static class DummyStoreWithOutputFormat implements StoreFunc {
+
+ /**
+ *
+ */
+ public DummyStoreWithOutputFormat() {
+ // TODO Auto-generated constructor stub
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#bindTo(java.io.OutputStream)
+ */
+ @Override
+ public void bindTo(OutputStream os) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#finish()
+ */
+ @Override
+ public void finish() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class getStorePreparationClass() throws IOException {
+ return DummyOutputFormat.class;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public static class DummyOutputFormat
+ implements OutputFormat<WritableComparable, Tuple> {
+
+ public DummyOutputFormat() {
+
+ }
+ /* (non-Javadoc)
+ * @see
org.apache.hadoop.mapred.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem,
org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ StoreConfig sConfig = MapRedUtil.getStoreConfig(job);
+ FileSystem fs = FileSystem.get(job);
+ // create a file to test that this method got called
+ fs.create(new Path(sConfig.getLocation() +
"_checkOutputSpec_test"));
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.hadoop.mapred.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem,
org.apache.hadoop.mapred.JobConf, java.lang.String,
org.apache.hadoop.util.Progressable)
+ */
+ @Override
+ public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+ FileSystem ignored, JobConf job, String name,
+ Progressable progress) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+
//
--------------------------------------------------------------------------
// Helper methods