Author: pradeepkth
Date: Fri Jun  4 00:45:24 2010
New Revision: 951229

URL: http://svn.apache.org/viewvc?rev=951229&view=rev
Log:
PIG-1433: pig should create success file if 
mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun  4 00:45:24 2010
@@ -70,6 +70,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1433: pig should create success file if
+mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)
+
 PIG-1347: Clear up output directory for a failed job (daijy)
 
 PIG-1419: Remove "user.name" from JobConf (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Jun  4 00:45:24 2010
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
@@ -79,6 +80,11 @@ public class MapReduceLauncher extends L
     private boolean aggregateWarning = false;
     private Map<FileSpec, Exception> failureMap;
 
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    
+    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+    
     /**
      * Get the exception that caused a failure on the backend for a
      * store location (if any).
@@ -309,6 +315,9 @@ public class MapReduceLauncher extends L
                     }
                     if (!st.isTmpStore()) {
                         succeededStores.add(st);
+                        // create an "_SUCCESS" file in output location if 
+                        // output location is a filesystem dir
+                        createSuccessFile(job, st);
                         finalStores++;
                         if (st.isMultiStore()) {
                             String counterName = 
PigStatsUtil.getMultiStoreCounterName(st);
@@ -509,6 +518,24 @@ public class MapReduceLauncher extends L
         PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
     }
 
+    private boolean shouldMarkOutputDir(Job job) {
+        return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                               false);
+    }
+    
+    private void createSuccessFile(Job job, POStore store) throws IOException {
+        if(shouldMarkOutputDir(job)) {
+            FileSystem fs = FileSystem.get(job.getJobConf());
+            Path outputPath = new Path(store.getSFile().getFileName());
+            if(fs.exists(outputPath)){
+                // create a file in the folder to mark it
+                Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+                if(!fs.exists(filePath)) {
+                    fs.create(filePath).close();
+                }
+            }    
+        } 
+    }
     
     /**
      * An exception handler class to handle exceptions thrown by the job 
controller thread

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Fri Jun  4 
00:45:24 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -55,6 +56,7 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -89,7 +91,14 @@ public class TestStore extends junit.fra
         
     String inputFileName;
     String outputFileName;
-        
+    
+    private static final String DUMMY_STORE_CLASS_NAME
+    = "org.apache.pig.test.TestStore\\$DummyStore";
+
+    private static final String FAIL_UDF_NAME
+    = "org.apache.pig.test.TestStore\\$FailUDF";
+    private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; 
+    
     @Override
     @Before
     public void setUp() throws Exception {
@@ -558,8 +567,168 @@ public class TestStore extends junit.fra
         }
     }
     
-    private static final String DUMMY_STORE_CLASS_NAME
-            = "org.apache.pig.test.TestStore\\$DummyStore";
+    // Test that "_SUCCESS" file is created when 
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
+    // property is set to true
+    // The test covers multi store and single store case in local and 
mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the 
property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation1() throws Exception {
+        PigServer ps = null;
+        String[] files = new String[] { inputFileName, 
+                outputFileName + "_1", outputFileName + "_2", outputFileName + 
"_3"};
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, 
ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
+            
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" + 
+                    "store d into '" + outputFileName + "_3';";
+            
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "store a into '" + outputFileName + "_1';" ;
+            
+            for (ExecType execType : modes) {
+                for(boolean isPropertySet: new boolean[] { true, false}) {
+                    for(boolean isMultiStore: new boolean[] { true, false}) {
+                        String script = (isMultiStore ? multiStoreScript : 
+                            singleStoreScript);
+                        // since we will be switching between map red and 
local modes
+                        // we will need to make sure filelocalizer is reset 
before each
+                        // run.
+                        FileLocalizer.setInitialized(false);
+                        if(execType == ExecType.MAPREDUCE) {
+                            ps = new PigServer(ExecType.MAPREDUCE, 
+                                    cluster.getProperties());
+                        } else {
+                            Properties props = new Properties();               
                           
+                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, 
"file:///");
+                            ps = new PigServer(ExecType.LOCAL, props);
+                        }
+                        ps.getPigContext().getProperties().setProperty(
+                                
MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                                Boolean.toString(isPropertySet));
+                        cleanupFiles(ps, files);
+                        ps.setBatchOn();
+                        Util.createInputFile(ps.getPigContext(), 
+                                inputFileName, inputData);
+                        Util.registerMultiLineQuery(ps, script);
+                        ps.executeBatch();
+                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                            String sucFile = outputFileName + "_" + i + "/" + 
+                                               
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                            assertEquals("Checking if _SUCCESS file exists in 
" + 
+                                    execType + " mode", isPropertySet, 
+                                    Util.exists(ps.getPigContext(), sucFile));
+                        }
+                    }
+                }
+            }
+        } finally {
+            cleanupFiles(ps, files);
+        }
+    }
+
+    // Test _SUCCESS file is NOT created when job fails and when 
+    // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to 
true
+    // The test covers multi store and single store case in local and 
mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the 
property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation2() throws Exception {
+        PigServer ps = null;
+        String[] files = new String[] { inputFileName, 
+                outputFileName + "_1", outputFileName + "_2", outputFileName + 
"_3"};
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, 
ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
+            System.err.println("XXX: " + TestStore.FailUDF.class.getName());
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + 
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" + 
+                    "store d into '" + outputFileName + "_3';";
+            
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + 
+                "store b into '" + outputFileName + "_1';" ;
+            
+            for (ExecType execType : modes) {
+                for(boolean isPropertySet: new boolean[] { true, false}) {
+                    for(boolean isMultiStore: new boolean[] { true, false}) {
+                        String script = (isMultiStore ? multiStoreScript : 
+                            singleStoreScript);
+                        // since we will be switching between map red and 
local modes
+                        // we will need to make sure filelocalizer is reset 
before each
+                        // run.
+                        FileLocalizer.setInitialized(false);
+                        if(execType == ExecType.MAPREDUCE) {
+                            // since the job is guaranteed to fail, let's set 
+                            // number of retries to 1.
+                            Properties props = cluster.getProperties();
+                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
+                            ps = new PigServer(ExecType.MAPREDUCE, props);
+                        } else {
+                            Properties props = new Properties();               
                           
+                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, 
"file:///");
+                            // since the job is guaranteed to fail, let's set 
+                            // number of retries to 1.
+                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
+                            ps = new PigServer(ExecType.LOCAL, props);
+                        }
+                        ps.getPigContext().getProperties().setProperty(
+                                
MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                                Boolean.toString(isPropertySet));
+                        cleanupFiles(ps, files);
+                        ps.setBatchOn();
+                        Util.createInputFile(ps.getPigContext(), 
+                                inputFileName, inputData);
+                        Util.registerMultiLineQuery(ps, script);
+                        try {
+                            ps.executeBatch();
+                        } catch(IOException ioe) {
+                            if(!ioe.getMessage().equals("FailUDFException")) {
+                                // an unexpected exception
+                                throw ioe;
+                            }
+                        }
+                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                            String sucFile = outputFileName + "_" + i + "/" + 
+                                               
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                            assertEquals("Checking if _SUCCESS file exists in 
" + 
+                                    execType + " mode", false, 
+                                    Util.exists(ps.getPigContext(), sucFile));
+                        }
+                    }
+                }
+            }
+        } finally {
+            cleanupFiles(ps, files);
+        }
+    }
+
+    // A UDF which always throws an Exception so that the job can fail
+    public static class FailUDF extends EvalFunc<String> {
+
+        @Override
+        public String exec(Tuple input) throws IOException {
+            throw new IOException("FailUDFException");
+        }
+        
+    }
+    private void cleanupFiles(PigServer ps, String... files) throws 
IOException {
+        for(String file:files) {
+            Util.deleteFile(ps.getPigContext(), file);
+        }    
+    }
+    
     
     public static class DummyStore extends PigStorage implements StoreMetadata{
 
@@ -622,7 +791,6 @@ public class TestStore extends junit.fra
         @Override
         public void storeStatistics(ResourceStatistics stats, String location,
                 Job job) throws IOException {
-            // TODO Auto-generated method stub
             
         }
     }


Reply via email to