I think it won't help much, since in a hadoop cluster, people already
allocate "SLOTS" to be the number of cores, supposedly the inherent
parallelism can be already exploited, since different mappers/reducers are
completely independent.


On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote:

> Dears,
>
> I suddenly got this idea to do mapreduce job in a muti-thread way.
> I don't know if it can work. Could anyone give me some advices?
> Here is the java code:
>
>
> import java.io.IOException;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>
> public class LogProcessApp extends Thread {
>
>  private static String sep;
>  private String x2;
>  private String x3;
>
>  public LogProcessApp(String arg1,String arg2,String arg3){
>   sep=arg1;
>   x2=arg2;
>   x3=arg3;
>  }
>
>  public static class CM extends Mapper<LongWritable, Text, Text, Text>{
>   private Text keyvar = new Text();
>   private Text valvar = new Text();
>   public void map(LongWritable key, Text value, Context context)
>     throws IOException, InterruptedException {
>    String line = value.toString();
>    try{
>     String data[] = line.split(sep);
>     keyvar.set(data[0]);
>     valvar.set(data[1]);
>     context.write(keyvar,valvar);
>    } catch (Exception e) {
>     return;
>    }
>   }
>  }
>
>  public void run(){
>   Configuration conf = new Configuration();
>   Job job = null;
>   try {
>    job = new Job(conf);
>   } catch (IOException e1) {
>    // TODO Auto-generated catch block
>    e1.printStackTrace();
>   }
>
>   job.setJobName("XXXJob");
>   job.setJarByClass(CMR.class);
>
>   job.setOutputKeyClass(Text.class);
>   job.setOutputValueClass(Text.class);
>
>   job.setMapperClass(CM.class);
>
>   job.setInputFormatClass(TextInputFormat.class);
>   job.setOutputFormatClass(TextOutputFormat.class);
>
>   try {
>    FileInputFormat.addInputPath(job, new Path(x2));
>   } catch (IOException e) {
>    // TODO Auto-generated catch block
>    e.printStackTrace();
>   }
>   FileOutputFormat.setOutputPath(job, new Path(x3));
>
>   try {
>    job.submit();
>   } catch (IOException e) {
>    // TODO Auto-generated catch block
>    e.printStackTrace();
>   } catch (InterruptedException e) {
>    // TODO Auto-generated catch block
>    e.printStackTrace();
>   } catch (ClassNotFoundException e) {
>    // TODO Auto-generated catch block
>    e.printStackTrace();
>   }
>
>  }
>
>
>  public static void main(String args[]){
>   LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]);
>   LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]);
>   LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]);
>   lpa1.start();
>   lpa2.start();
>   lpa3.start();
>  }
> }
>

Reply via email to