Author: olga Date: Thu Jul 29 00:24:37 2010 New Revision: 980274 URL: http://svn.apache.org/viewvc?rev=980274&view=rev Log: PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/conf/pig-default.properties hadoop/pig/trunk/conf/pig.properties hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=980274&r1=980273&r2=980274&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Jul 29 00:24:37 2010 @@ -22,6 +22,8 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES +PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL keyword (zjffdu vi olgan) + IMPROVEMENTS PIG-928: UDFs in scripting languages (daijy) Modified: hadoop/pig/trunk/conf/pig-default.properties URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=980274&r1=980273&r2=980274&view=diff ============================================================================== --- hadoop/pig/trunk/conf/pig-default.properties (original) +++ hadoop/pig/trunk/conf/pig-default.properties Thu Jul 29 00:24:37 2010 @@ -21,3 +21,7 @@ pig.spill.size.threshold=5000000 #EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes) #This should help reduce the number of files being spilled. pig.spill.gc.activation.size=40000000 + +#the following two parameters are to help estimate the reducer number +pig.exec.reducers.bytes.per.reducer=1000000000 +pig.exec.reducers.max=999 \ No newline at end of file Modified: hadoop/pig/trunk/conf/pig.properties URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig.properties?rev=980274&r1=980273&r2=980274&view=diff ============================================================================== --- hadoop/pig/trunk/conf/pig.properties (original) +++ hadoop/pig/trunk/conf/pig.properties Thu Jul 29 00:24:37 2010 @@ -22,3 +22,7 @@ #EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes) #This should help reduce the number of files being spilled. #pig.spill.gc.activation.size=40000000 + +#the following two parameters are to help estimate the reducer number +#pig.exec.reducers.bytes.per.reducer=1000000000 +#pig.exec.reducers.max=999 \ No newline at end of file Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=980274&r1=980273&r2=980274&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Jul 29 00:24:37 2010 @@ -339,10 +339,6 @@ public class JobControlCompiler{ ss.addSettingsToConf(mro, conf); } - //Set the User Name for this job. This will be - //used as the working directory - if (pigContext.defaultParallel > 0) - conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel); conf.set("mapred.mapper.new-api", "true"); conf.set("mapred.reducer.new-api", "true"); @@ -533,8 +529,15 @@ public class JobControlCompiler{ mro.reducePlan.remove(pack); nwJob.setMapperClass(PigMapReduce.Map.class); nwJob.setReducerClass(PigMapReduce.Reduce.class); - if (mro.requestedParallelism>0) + + // first check the PARALLE in query, then check the defaultParallel in PigContext, and last do estimation + if (mro.requestedParallelism > 0) nwJob.setNumReduceTasks(mro.requestedParallelism); + else if (pigContext.defaultParallel > 0) + conf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel); + else + estimateNumberOfReducers(conf,lds); + if (mro.customPartitioner != null) nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner)); @@ -642,6 +645,75 @@ public class JobControlCompiler{ } } + /** + * Currently the estimation of reducer number is only applied to HDFS, The estimation is based on the input size of data storage on HDFS. + * Two parameters can been configured for the estimation, one is pig.exec.reducers.max which constrain the maximum number of reducer task (default is 999). The other + * is pig.exec.reducers.bytes.per.reducer(default value is 1000*1000*1000) which means the how much data can been handled for each reducer. + * e.g. the following is your pig script + * a = load '/data/a'; + * b = load '/data/b'; + * c = join a by $0, b by $0; + * store c into '/tmp'; + * + * The size of /data/a is 1000*1000*1000, and size of /data/b is 2*1000*1000*1000. + * Then the estimated reducer number is (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3 + * @param conf + * @param lds + * @throws IOException + */ + private void estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException { + long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)); + int maxReducers = conf.getInt("pig.exec.reducers.max", 999); + long totalInputFileSize = getTotalInputFileSize(lds); + + log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " totalInputFileSize=" + totalInputFileSize); + + int reducers = (int)Math.ceil((totalInputFileSize+0.0) / bytesPerReducer); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + conf.setInt("mapred.reduce.tasks", reducers); + + log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers); + } + + private long getTotalInputFileSize(List<POLoad> lds) throws IOException { + List<String> inputs = new ArrayList<String>(); + if(lds!=null && lds.size()>0){ + for (POLoad ld : lds) { + inputs.add(ld.getLFile().getFileName()); + } + } + long size = 0; + FileSystem fs = FileSystem.get(conf); + for (String input : inputs){ + Path path = new Path(input); + String schema = path.toUri().getScheme(); + if (schema==null || schema.equalsIgnoreCase("hdfs") || schema.equalsIgnoreCase("file")){ + FileStatus[] status=fs.globStatus(new Path(input)); + if (status != null){ + for (FileStatus s : status){ + size += getPathLength(fs, s); + } + } + } + } + return size; + } + + private long getPathLength(FileSystem fs,FileStatus status) throws IOException{ + if (!status.isDir()){ + return status.getLen(); + }else{ + FileStatus[] children = fs.listStatus(status.getPath()); + long size=0; + for (FileStatus child : children){ + size +=getPathLength(fs, child); + } + return size; + } + } + public static class PigSecondaryKeyGroupComparator extends WritableComparator { public PigSecondaryKeyGroupComparator() { // super(TupleFactory.getInstance().tupleClass(), true); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=980274&r1=980273&r2=980274&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Jul 29 00:24:37 2010 @@ -545,6 +545,68 @@ public class TestJobSubmission extends j pc.defaultParallel = -1; } + @Test + public void testReducerNumEstimation() throws Exception{ + // use the estimation + LogicalPlanTester planTester = new LogicalPlanTester(pc) ; + Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd"); + planTester.buildPlan("a = load '/passwd';"); + LogicalPlan lp = planTester.buildPlan("b = group a by $0;"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + POStore store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100"); + pc.getConf().setProperty("pig.exec.reducers.max", "10"); + HExecutionEngine exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + JobControlCompiler jcc = new JobControlCompiler(pc, conf); + JobControl jc=jcc.compile(mrPlan, "Test"); + Job job = jc.getWaitingJobs().get(0); + long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10); + assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), reducer); + + // use the PARALLEL key word, it will override the estimated reducer number + planTester = new LogicalPlanTester(pc) ; + planTester.buildPlan("a = load '/passwd';"); + lp = planTester.buildPlan("b = group a by $0 PARALLEL 2;"); + pp = Util.buildPhysicalPlan(lp, pc); + store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + mrPlan = Util.buildMRPlan(pp, pc); + + pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100"); + pc.getConf().setProperty("pig.exec.reducers.max", "10"); + exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + jcc = new JobControlCompiler(pc, conf); + jc=jcc.compile(mrPlan, "Test"); + job = jc.getWaitingJobs().get(0); + assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 2); + + // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase + planTester = new LogicalPlanTester(pc) ; + planTester.buildPlan("a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');"); + lp = planTester.buildPlan("b = group a by $0 ;"); + pp = Util.buildPhysicalPlan(lp, pc); + store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + mrPlan = Util.buildMRPlan(pp, pc); + + pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100"); + pc.getConf().setProperty("pig.exec.reducers.max", "10"); + exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + jcc = new JobControlCompiler(pc, conf); + jc=jcc.compile(mrPlan, "Test"); + job = jc.getWaitingJobs().get(0); + assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 1); + } + private void submit() throws Exception{ assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc)); MapReduceLauncher mrl = new MapReduceLauncher();