Author: pradeepkth
Date: Fri Oct 30 20:40:31 2009
New Revision: 831449

URL: http://svn.apache.org/viewvc?rev=831449&view=rev
Log:
PIG-1063:  Pig does not call checkOutSpecs() on OutputFormat provided by 
StoreFunc in the multistore case (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 20:40:31 2009
@@ -107,6 +107,9 @@
 
 BUG FIXES
 
+PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
+StoreFunc in the multistore case (pradeepkth)
+
 PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
 should never be serialized (rding via pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 Fri Oct 30 20:40:31 2009
@@ -19,7 +19,10 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -31,12 +34,17 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 
 /**
@@ -88,9 +96,79 @@
         return new PigRecordWriter(fs, new Path(outputDir, name), store);
     }
 
+    @SuppressWarnings("deprecation")
     public void checkOutputSpecs(FileSystem fs, JobConf job) throws 
IOException {
-        // TODO We really should validate things here
-        return;
+        
+        // check if there is any storeFunc which internally has an
+        // OutputFormat - if it does, we should be delegating this call
+        // to it after setting up the StoreFunc and StoreConfig properties
+        // in the JobConf.
+        PhysicalPlan mp = (PhysicalPlan) ObjectSerializer.deserialize(
+                job.get("pig.mapPlan"));
+        List<POStore> mapStores = PlanHelper.getStores(mp);
+        PhysicalPlan rp = (PhysicalPlan) ObjectSerializer.deserialize(
+                    job.get("pig.reducePlan"));
+        List<POStore> reduceStores = new ArrayList<POStore>();
+        if(rp != null) {
+            reduceStores = PlanHelper.getStores(rp);    
+        }
+
+        // In the case of single store in the job, we remove the store
+        // out of the map/reduce plan and in that case, if the store had
+        // an OutputFormat, we would have set that to be the Job's 
+        // OutputFormat (relevant code in JobControlCompiler). We only need
+        // to handle multi store case - to be safe, we check for non zero
+        // store size
+        if(mapStores.size() > 0) {
+            for (POStore store : mapStores) {
+                checkOutputSpecsHelper(fs, store, job);
+            }
+        }
+        if(reduceStores.size() > 0) {
+            for (POStore store : reduceStores) {
+                checkOutputSpecsHelper(fs, store, job);
+            }
+        }
+    }
+
+    /**
+     * @param fs 
+     * @param store
+     * @param job
+     * @throws IOException 
+     */
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void checkOutputSpecsHelper(FileSystem fs, POStore store, JobConf 
job) 
+    throws IOException {
+        StoreFunc storeFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(
+                store.getSFile().getFuncSpec());
+        Class sPrepClass = null;
+        try {
+            sPrepClass = storeFunc.getStorePreparationClass();
+        } catch(AbstractMethodError e) {
+            // this is for backward compatibility wherein some old StoreFunc
+            // which does not implement getStorePreparationClass() is being
+            // used. In this case, we want to just use PigOutputFormat
+            sPrepClass = null;
+        }
+        if(sPrepClass != null && 
OutputFormat.class.isAssignableFrom(sPrepClass)) {
+        
+            StoreConfig storeConfig = new StoreConfig(store.getSFile().
+                    getFileName(), store.getSchema(), store.getSortInfo());
+            // make a copy of the conf since we may be dealing with multiple
+            // stores. Set storeFunc and StoreConfig 
+            // pertaining to this store in the copy and use it
+            JobConf confCopy = new JobConf(job);
+            confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+                    store.getSFile().getFuncSpec().toString()));
+            confCopy.set(JobControlCompiler.PIG_STORE_CONFIG, 
+                    ObjectSerializer.serialize(storeConfig));
+            confCopy.setOutputFormat(sPrepClass);
+            OutputFormat of = confCopy.getOutputFormat();
+            of.checkOutputSpecs(fs, confCopy);
+        
+        }
+        
     }
 
     static public class PigRecordWriter implements

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=831449&r1=831448&r2=831449&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri Oct 30 
20:40:31 2009
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
@@ -36,6 +37,8 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.util.ExecTools;
 import org.apache.pig.backend.executionengine.ExecJob;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
@@ -45,6 +48,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -59,8 +63,14 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.util.Progressable;
 
 public class TestMultiQuery extends TestCase {
 
@@ -1904,7 +1914,117 @@
             Assert.fail();
         } 
     }
+    
+    /**
+     * Test that pig calls checkOutputSpecs() method of the OutputFormat (if 
the
+     * StoreFunc defines an OutputFormat as the return value of 
+     * {...@link StoreFunc#getStorePreparationClass()} 
+     * @throws IOException
+     */
+    @Test
+    public void testMultiStoreWithOutputFormat() throws IOException {
+        Util.createInputFile(cluster, "input.txt", new String[] {"hello", 
"bye"});
+        String query = "a = load 'input.txt';" +
+                       "b = filter a by $0 < 10;" +
+                       "store b into 'output1' using 
"+DummyStoreWithOutputFormat.class.getName()+"();" +
+                       "c = group a by $0;" +
+                       "d = foreach c generate group, COUNT(a.$0);" +
+                       "store d into 'output2' using 
"+DummyStoreWithOutputFormat.class.getName()+"();" ;
+        myPig.setBatchOn();
+        Util.registerMultiLineQuery(myPig, query);
+        myPig.executeBatch();
+        
+        // check that files were created as a result of the
+        // checkOutputSpecs() method of the OutputFormat being called
+        FileSystem fs = cluster.getFileSystem();
+        assertEquals(true, fs.exists(new 
Path("output1_checkOutputSpec_test")));
+        assertEquals(true, fs.exists(new 
Path("output2_checkOutputSpec_test")));
+        Util.deleteFile(cluster, "input.txt");
+        Util.deleteFile(cluster, "output1_checkOutputSpec_test");
+        Util.deleteFile(cluster, "output2_checkOutputSpec_test");
+    }
+
+    public static class DummyStoreWithOutputFormat implements StoreFunc {
+        
+        /**
+         * 
+         */
+        public DummyStoreWithOutputFormat() {
+            // TODO Auto-generated constructor stub
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#bindTo(java.io.OutputStream)
+         */
+        @Override
+        public void bindTo(OutputStream os) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#finish()
+         */
+        @Override
+        public void finish() throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+         */
+        @Override
+        @SuppressWarnings("unchecked")
+        public Class getStorePreparationClass() throws IOException {
+            return DummyOutputFormat.class;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+                
+    }
+    
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public static class DummyOutputFormat
+    implements OutputFormat<WritableComparable, Tuple> {
+
+        public DummyOutputFormat() {
+            
+        }
+        /* (non-Javadoc)
+         * @see 
org.apache.hadoop.mapred.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem,
 org.apache.hadoop.mapred.JobConf)
+         */
+        @Override
+        public void checkOutputSpecs(FileSystem ignored, JobConf job)
+                throws IOException {
+            StoreConfig sConfig = MapRedUtil.getStoreConfig(job);
+            FileSystem fs = FileSystem.get(job);
+            // create a file to test that this method got called
+            fs.create(new Path(sConfig.getLocation() + 
"_checkOutputSpec_test"));
+        }
+
+        /* (non-Javadoc)
+         * @see 
org.apache.hadoop.mapred.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem,
 org.apache.hadoop.mapred.JobConf, java.lang.String, 
org.apache.hadoop.util.Progressable)
+         */
+        @Override
+        public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+                FileSystem ignored, JobConf job, String name,
+                Progressable progress) throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+        
+    }
 
+    
+    
     // 
--------------------------------------------------------------------------
     // Helper methods
 


Reply via email to