Author: pradeepkth Date: Fri Jun 4 16:05:40 2010 New Revision: 951469 URL: http://svn.apache.org/viewvc?rev=951469&view=rev Log: PIG-1433: pig should create success file if mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=951469&r1=951468&r2=951469&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Jun 4 16:05:40 2010 @@ -189,6 +189,9 @@ OPTIMIZATIONS BUG FIXES +PIG-1433: pig should create success file if +mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth) + PIG-1415: LoadFunc signature is not correct in LoadFunc.getSchema sometimes (daijy) PIG-1403: Make Pig work with remote HDFS in secure mode (daijy) Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=951469&r1=951468&r2=951469&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Jun 4 16:05:40 2010 @@ -29,6 +29,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -77,6 +79,11 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; private Map<FileSpec, Exception> failureMap; + public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; + + public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = + "mapreduce.fileoutputcommitter.marksuccessfuljobs"; + /** * Get the exception that caused a failure on the backend for a * store location (if any). @@ -304,6 +311,9 @@ public class MapReduceLauncher extends L } if (!st.isTmpStore()) { succeededStores.add(st); + // create an "_SUCCESS" file in output location if + // output location is a filesystem dir + createSuccessFile(job, st); finalStores++; log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\""); } @@ -488,6 +498,24 @@ public class MapReduceLauncher extends L PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration()); } + private boolean shouldMarkOutputDir(Job job) { + return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + false); + } + + private void createSuccessFile(Job job, POStore store) throws IOException { + if(shouldMarkOutputDir(job)) { + FileSystem fs = FileSystem.get(job.getJobConf()); + Path outputPath = new Path(store.getSFile().getFileName()); + if(fs.exists(outputPath)){ + // create a file in the folder to mark it + Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); + if(!fs.exists(filePath)) { + fs.create(filePath).close(); + } + } + } + } /** * An exception handler class to handle exceptions thrown by the job controller thread Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java?rev=951469&r1=951468&r2=951469&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java Fri Jun 4 16:05:40 2010 @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -55,6 +56,7 @@ import org.apache.pig.data.DefaultBagFac import org.apache.pig.data.DefaultTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.LOStore; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -89,7 +91,10 @@ public class TestStore extends junit.fra String inputFileName; String outputFileName; - + + + private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; + @Override @Before public void setUp() throws Exception { @@ -557,6 +562,170 @@ public class TestStore extends junit.fra Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2); } } + + // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" + // property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation1() throws Exception { + PigServer ps = null; + String[] files = new String[] { inputFileName, + outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; + try { + ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName + "_1';" ; + + for (ExecType execType : modes) { + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + // since we will be switching between map red and local modes + // we will need to make sure filelocalizer is reset before each + // run. + FileLocalizer.setInitialized(false); + if(execType == ExecType.MAPREDUCE) { + ps = new PigServer(ExecType.MAPREDUCE, + cluster.getProperties()); + } else { + Properties props = new Properties(); + props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); + ps = new PigServer(ExecType.LOCAL, props); + } + ps.getPigContext().getProperties().setProperty( + MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + Boolean.toString(isPropertySet)); + cleanupFiles(ps, files); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + execType + " mode", isPropertySet, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } + } finally { + cleanupFiles(ps, files); + } + } + + // Test _SUCCESS file is NOT created when job fails and when + // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation2() throws Exception { + PigServer ps = null; + String[] files = new String[] { inputFileName, + outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; + try { + ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + System.err.println("XXX: " + TestStore.FailUDF.class.getName()); + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "b = foreach b generate " + FailUDF.class.getName() + "($0);" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "b = foreach a generate " + FailUDF.class.getName() + "($0);" + + "store b into '" + outputFileName + "_1';" ; + + for (ExecType execType : modes) { + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + // since we will be switching between map red and local modes + // we will need to make sure filelocalizer is reset before each + // run. + FileLocalizer.setInitialized(false); + if(execType == ExecType.MAPREDUCE) { + // since the job is guaranteed to fail, let's set + // number of retries to 1. + Properties props = cluster.getProperties(); + props.setProperty(MAP_MAX_ATTEMPTS, "1"); + ps = new PigServer(ExecType.MAPREDUCE, props); + } else { + Properties props = new Properties(); + props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); + // since the job is guaranteed to fail, let's set + // number of retries to 1. + props.setProperty(MAP_MAX_ATTEMPTS, "1"); + ps = new PigServer(ExecType.LOCAL, props); + } + ps.getPigContext().getProperties().setProperty( + MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + Boolean.toString(isPropertySet)); + cleanupFiles(ps, files); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + try { + ps.executeBatch(); + } catch(IOException ioe) { + if(!ioe.getMessage().equals("FailUDFException")) { + // an unexpected exception + throw ioe; + } + } + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + execType + " mode", false, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } + } finally { + cleanupFiles(ps, files); + } + } + + // A UDF which always throws an Exception so that the job can fail + public static class FailUDF extends EvalFunc<String> { + + @Override + public String exec(Tuple input) throws IOException { + throw new IOException("FailUDFException"); + } + + } + private void cleanupFiles(PigServer ps, String... files) throws IOException { + for(String file:files) { + Util.deleteFile(ps.getPigContext(), file); + } + } + + public static class DummyStore extends PigStorage implements StoreMetadata{ private boolean failInPutNext = false; @@ -618,7 +787,6 @@ public class TestStore extends junit.fra @Override public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException { - // TODO Auto-generated method stub } }