Exactly - A job is already designed to be properly parallel w.r.t. its
input, and this would just add additional overheads of job setup and
scheduling. If your per-record processing requires threaded work,
consider using the MultithreadedMapper/Reducer classes instead.

On Wed, Dec 12, 2012 at 10:53 PM, Yang <teddyyyy...@gmail.com> wrote:
> I think it won't help much, since in a hadoop cluster, people already
> allocate "SLOTS" to be the number of cores, supposedly the inherent
> parallelism can be already exploited, since different mappers/reducers are
> completely independent.
>
>
> On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote:
>>
>> Dears,
>>
>> I suddenly got this idea to do mapreduce job in a muti-thread way.
>> I don't know if it can work. Could anyone give me some advices?
>> Here is the java code:
>>
>>
>> import java.io.IOException;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.hadoop.mapreduce.Mapper;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>>
>> public class LogProcessApp extends Thread {
>>
>>  private static String sep;
>>  private String x2;
>>  private String x3;
>>
>>  public LogProcessApp(String arg1,String arg2,String arg3){
>>   sep=arg1;
>>   x2=arg2;
>>   x3=arg3;
>>  }
>>
>>  public static class CM extends Mapper<LongWritable, Text, Text, Text>{
>>   private Text keyvar = new Text();
>>   private Text valvar = new Text();
>>   public void map(LongWritable key, Text value, Context context)
>>     throws IOException, InterruptedException {
>>    String line = value.toString();
>>    try{
>>     String data[] = line.split(sep);
>>     keyvar.set(data[0]);
>>     valvar.set(data[1]);
>>     context.write(keyvar,valvar);
>>    } catch (Exception e) {
>>     return;
>>    }
>>   }
>>  }
>>
>>  public void run(){
>>   Configuration conf = new Configuration();
>>   Job job = null;
>>   try {
>>    job = new Job(conf);
>>   } catch (IOException e1) {
>>    // TODO Auto-generated catch block
>>    e1.printStackTrace();
>>   }
>>
>>   job.setJobName("XXXJob");
>>   job.setJarByClass(CMR.class);
>>
>>   job.setOutputKeyClass(Text.class);
>>   job.setOutputValueClass(Text.class);
>>
>>   job.setMapperClass(CM.class);
>>
>>   job.setInputFormatClass(TextInputFormat.class);
>>   job.setOutputFormatClass(TextOutputFormat.class);
>>
>>   try {
>>    FileInputFormat.addInputPath(job, new Path(x2));
>>   } catch (IOException e) {
>>    // TODO Auto-generated catch block
>>    e.printStackTrace();
>>   }
>>   FileOutputFormat.setOutputPath(job, new Path(x3));
>>
>>   try {
>>    job.submit();
>>   } catch (IOException e) {
>>    // TODO Auto-generated catch block
>>    e.printStackTrace();
>>   } catch (InterruptedException e) {
>>    // TODO Auto-generated catch block
>>    e.printStackTrace();
>>   } catch (ClassNotFoundException e) {
>>    // TODO Auto-generated catch block
>>    e.printStackTrace();
>>   }
>>
>>  }
>>
>>
>>  public static void main(String args[]){
>>   LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]);
>>   LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]);
>>   LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]);
>>   lpa1.start();
>>   lpa2.start();
>>   lpa3.start();
>>  }
>> }
>
>



-- 
Harsh J

Reply via email to