I suppose you could also leverage job configuration or per-input
mapper impl. via MultipleInputs to do this.

On Thu, Dec 13, 2012 at 5:44 PM, Yu Yang <clouder...@gmail.com> wrote:
> Thank you all.  In fact, I don't expect that this way can help to enhance
> the performance.
>  I  need to process 3 different logs (with different format). I just want to
> sart all these 3 logs processing at the same time , all in just this one
> program. but  I can give different separator to each thread to create maps
> to  handle different logs.
>
>
> 2012/12/13 Yang <teddyyyy...@gmail.com>
>>
>> but I do have run across some situations where I could benefit from
>> multi-threading: if your hadoop mapper is prone to random access IO (such as
>> looking up a TFile, or HBase, which ultimately makes a network call and then
>> looks into a file segment), having multiple threads could utilize the CPU
>> while IO is going on
>>
>>
>> On Wed, Dec 12, 2012 at 10:47 AM, Harsh J <ha...@cloudera.com> wrote:
>>>
>>> Exactly - A job is already designed to be properly parallel w.r.t. its
>>> input, and this would just add additional overheads of job setup and
>>> scheduling. If your per-record processing requires threaded work,
>>> consider using the MultithreadedMapper/Reducer classes instead.
>>>
>>> On Wed, Dec 12, 2012 at 10:53 PM, Yang <teddyyyy...@gmail.com> wrote:
>>> > I think it won't help much, since in a hadoop cluster, people already
>>> > allocate "SLOTS" to be the number of cores, supposedly the inherent
>>> > parallelism can be already exploited, since different mappers/reducers
>>> > are
>>> > completely independent.
>>> >
>>> >
>>> > On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote:
>>> >>
>>> >> Dears,
>>> >>
>>> >> I suddenly got this idea to do mapreduce job in a muti-thread way.
>>> >> I don't know if it can work. Could anyone give me some advices?
>>> >> Here is the java code:
>>> >>
>>> >>
>>> >> import java.io.IOException;
>>> >> import org.apache.hadoop.conf.Configuration;
>>> >> import org.apache.hadoop.fs.Path;
>>> >> import org.apache.hadoop.io.LongWritable;
>>> >> import org.apache.hadoop.io.Text;
>>> >> import org.apache.hadoop.mapreduce.Job;
>>> >> import org.apache.hadoop.mapreduce.Mapper;
>>> >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>> >> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
>>> >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>>> >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>>> >>
>>> >> public class LogProcessApp extends Thread {
>>> >>
>>> >>  private static String sep;
>>> >>  private String x2;
>>> >>  private String x3;
>>> >>
>>> >>  public LogProcessApp(String arg1,String arg2,String arg3){
>>> >>   sep=arg1;
>>> >>   x2=arg2;
>>> >>   x3=arg3;
>>> >>  }
>>> >>
>>> >>  public static class CM extends Mapper<LongWritable, Text, Text,
>>> >> Text>{
>>> >>   private Text keyvar = new Text();
>>> >>   private Text valvar = new Text();
>>> >>   public void map(LongWritable key, Text value, Context context)
>>> >>     throws IOException, InterruptedException {
>>> >>    String line = value.toString();
>>> >>    try{
>>> >>     String data[] = line.split(sep);
>>> >>     keyvar.set(data[0]);
>>> >>     valvar.set(data[1]);
>>> >>     context.write(keyvar,valvar);
>>> >>    } catch (Exception e) {
>>> >>     return;
>>> >>    }
>>> >>   }
>>> >>  }
>>> >>
>>> >>  public void run(){
>>> >>   Configuration conf = new Configuration();
>>> >>   Job job = null;
>>> >>   try {
>>> >>    job = new Job(conf);
>>> >>   } catch (IOException e1) {
>>> >>    // TODO Auto-generated catch block
>>> >>    e1.printStackTrace();
>>> >>   }
>>> >>
>>> >>   job.setJobName("XXXJob");
>>> >>   job.setJarByClass(CMR.class);
>>> >>
>>> >>   job.setOutputKeyClass(Text.class);
>>> >>   job.setOutputValueClass(Text.class);
>>> >>
>>> >>   job.setMapperClass(CM.class);
>>> >>
>>> >>   job.setInputFormatClass(TextInputFormat.class);
>>> >>   job.setOutputFormatClass(TextOutputFormat.class);
>>> >>
>>> >>   try {
>>> >>    FileInputFormat.addInputPath(job, new Path(x2));
>>> >>   } catch (IOException e) {
>>> >>    // TODO Auto-generated catch block
>>> >>    e.printStackTrace();
>>> >>   }
>>> >>   FileOutputFormat.setOutputPath(job, new Path(x3));
>>> >>
>>> >>   try {
>>> >>    job.submit();
>>> >>   } catch (IOException e) {
>>> >>    // TODO Auto-generated catch block
>>> >>    e.printStackTrace();
>>> >>   } catch (InterruptedException e) {
>>> >>    // TODO Auto-generated catch block
>>> >>    e.printStackTrace();
>>> >>   } catch (ClassNotFoundException e) {
>>> >>    // TODO Auto-generated catch block
>>> >>    e.printStackTrace();
>>> >>   }
>>> >>
>>> >>  }
>>> >>
>>> >>
>>> >>  public static void main(String args[]){
>>> >>   LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]);
>>> >>   LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]);
>>> >>   LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]);
>>> >>   lpa1.start();
>>> >>   lpa2.start();
>>> >>   lpa3.start();
>>> >>  }
>>> >> }
>>> >
>>> >
>>>
>>>
>>>
>>> --
>>> Harsh J
>>
>>
>



-- 
Harsh J

Reply via email to