Exactly - A job is already designed to be properly parallel w.r.t. its input, and this would just add additional overheads of job setup and scheduling. If your per-record processing requires threaded work, consider using the MultithreadedMapper/Reducer classes instead.
On Wed, Dec 12, 2012 at 10:53 PM, Yang <teddyyyy...@gmail.com> wrote: > I think it won't help much, since in a hadoop cluster, people already > allocate "SLOTS" to be the number of cores, supposedly the inherent > parallelism can be already exploited, since different mappers/reducers are > completely independent. > > > On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote: >> >> Dears, >> >> I suddenly got this idea to do mapreduce job in a muti-thread way. >> I don't know if it can work. Could anyone give me some advices? >> Here is the java code: >> >> >> import java.io.IOException; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.io.LongWritable; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapreduce.Job; >> import org.apache.hadoop.mapreduce.Mapper; >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; >> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; >> >> public class LogProcessApp extends Thread { >> >> private static String sep; >> private String x2; >> private String x3; >> >> public LogProcessApp(String arg1,String arg2,String arg3){ >> sep=arg1; >> x2=arg2; >> x3=arg3; >> } >> >> public static class CM extends Mapper<LongWritable, Text, Text, Text>{ >> private Text keyvar = new Text(); >> private Text valvar = new Text(); >> public void map(LongWritable key, Text value, Context context) >> throws IOException, InterruptedException { >> String line = value.toString(); >> try{ >> String data[] = line.split(sep); >> keyvar.set(data[0]); >> valvar.set(data[1]); >> context.write(keyvar,valvar); >> } catch (Exception e) { >> return; >> } >> } >> } >> >> public void run(){ >> Configuration conf = new Configuration(); >> Job job = null; >> try { >> job = new Job(conf); >> } catch (IOException e1) { >> // TODO Auto-generated catch block >> e1.printStackTrace(); >> } >> >> job.setJobName("XXXJob"); >> job.setJarByClass(CMR.class); >> >> job.setOutputKeyClass(Text.class); >> job.setOutputValueClass(Text.class); >> >> job.setMapperClass(CM.class); >> >> job.setInputFormatClass(TextInputFormat.class); >> job.setOutputFormatClass(TextOutputFormat.class); >> >> try { >> FileInputFormat.addInputPath(job, new Path(x2)); >> } catch (IOException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> FileOutputFormat.setOutputPath(job, new Path(x3)); >> >> try { >> job.submit(); >> } catch (IOException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } catch (InterruptedException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } catch (ClassNotFoundException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> >> } >> >> >> public static void main(String args[]){ >> LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]); >> LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]); >> LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]); >> lpa1.start(); >> lpa2.start(); >> lpa3.start(); >> } >> } > > -- Harsh J