Hi, >>so I wanted to try and lower the number to 10 and see how the performance is The number of mappers is provided as only a hint to the framework, it is not guaranteed to be that number.
>>I have been digging around in the hadoop source code and it looks like the >>JobClient actually sets the mappers to the number of splits (hard coded) The number of splits are determined by your InputFormat ( or more precisely its getSplits() method ) , the framework spawns one mapper task per split. So you essentially do have control over the number of map tasks. The 1:1 correspondence assumes you take care of data locality optimizations, if any, while generating splits ( like in CombineFileInputFormat ). >>Is there anything I can do to get the number of mappers to be more flexible? As Jeff suggested, use available library of input formats, or for more control, probably write your own one :) Thanks, Amogh On 1/19/10 2:35 AM, "Teryl Taylor" <teryl.tay...@gmail.com> wrote: Hi everyone, I'm playing around with the Hadoop map reduce library and I'm getting some odd behaviour. The system is setup on one machine using the pseudo distributed configuration. I use KFS as my DFS. I have written a MapReduce program to process a bunch of binary files. The files are compressed in chunks, so I do not split the files. There are 1083 files that I am loading into the map reducer. Everytime I run the map reducer: ~/hadoop/bin/hadoop jar /home/hadoop/lib/test.jar org.apache.hadoop.mapreduce.apps.Test /input/2007/*/*/ /output It always creates 1083 mapper tasks to do the processing..which is extremely slow....so I wanted to try and lower the number to 10 and see how the performance is. I set the following in mapred-site.xml: <configuration> <property> <name>mapred.job.tracker</name> <value>localhost:9001</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>10</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>10</value> </property> <property> <name>mapred.map.tasks</name> <value>10</value> </property> </configuration> Have recycled the jobtracker and tasktracker and I still always get 1083 mappers. The map reducer is working as expected other than this. I'm using the new API and my main function in my class looks like: public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // conf.setInt("mapred.map.tasks", 10); Job job = new Job(conf, "word count"); conf.setNumMapTasks(10); job.setJarByClass(Test.class); job.setMapperClass(TestMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(CustomInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } I have been digging around in the hadoop source code and it looks like the JobClient actually sets the mappers to the number of splits (hard coded)....snippet from the JobClient class: **************************************************************************************************************************************************************************************************************************************** /** * Internal method for submitting jobs to the system. * @param job the configuration to submit * @return a proxy object for the running job * @throws FileNotFoundException * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ public RunningJob submitJobInternal(JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitSplitFile = new Path(submitJobDir, "job.split"); configureCommandLineOptions(job, submitJobDir, submitJarFile); Path submitJobFile = new Path(submitJobDir, "job.xml"); int reduces = job.getNumReduceTasks(); JobContext context = new JobContext(job, jobId); // Check the output specification if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), job); output.checkOutputSpecs(context); } else { job.getOutputFormat().checkOutputSpecs(fs, job); } // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); int maps; if (job.getUseNewMapper()) { maps = writeNewSplits(context, submitSplitFile); } else { maps = writeOldSplits(job, submitSplitFile); } job.set("mapred.job.split.file", submitSplitFile.toString()); job.setNumMapTasks(maps); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JOB_FILE_PERMISSION)); try { job.writeXml(out); } finally { out.close(); ..... 737,0-1 39% } *********************************************************************************************************************************************************************************************************************************************** Is there anything I can do to get the number of mappers to be more flexible? Cheers, Teryl