Thank you all. In fact, I don't expect that this way can help to enhance the performance. I need to process 3 different logs (with different format). I just want to sart all these 3 logs processing at the same time , all in just this one program. but I can give different separator to each thread to create maps to handle different logs.
2012/12/13 Yang <teddyyyy...@gmail.com> > but I do have run across some situations where I could benefit from > multi-threading: if your hadoop mapper is prone to random access IO (such > as looking up a TFile, or HBase, which ultimately makes a network call and > then looks into a file segment), having multiple threads could utilize the > CPU while IO is going on > > > On Wed, Dec 12, 2012 at 10:47 AM, Harsh J <ha...@cloudera.com> wrote: > >> Exactly - A job is already designed to be properly parallel w.r.t. its >> input, and this would just add additional overheads of job setup and >> scheduling. If your per-record processing requires threaded work, >> consider using the MultithreadedMapper/Reducer classes instead. >> >> On Wed, Dec 12, 2012 at 10:53 PM, Yang <teddyyyy...@gmail.com> wrote: >> > I think it won't help much, since in a hadoop cluster, people already >> > allocate "SLOTS" to be the number of cores, supposedly the inherent >> > parallelism can be already exploited, since different mappers/reducers >> are >> > completely independent. >> > >> > >> > On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote: >> >> >> >> Dears, >> >> >> >> I suddenly got this idea to do mapreduce job in a muti-thread way. >> >> I don't know if it can work. Could anyone give me some advices? >> >> Here is the java code: >> >> >> >> >> >> import java.io.IOException; >> >> import org.apache.hadoop.conf.Configuration; >> >> import org.apache.hadoop.fs.Path; >> >> import org.apache.hadoop.io.LongWritable; >> >> import org.apache.hadoop.io.Text; >> >> import org.apache.hadoop.mapreduce.Job; >> >> import org.apache.hadoop.mapreduce.Mapper; >> >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; >> >> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; >> >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >> >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; >> >> >> >> public class LogProcessApp extends Thread { >> >> >> >> private static String sep; >> >> private String x2; >> >> private String x3; >> >> >> >> public LogProcessApp(String arg1,String arg2,String arg3){ >> >> sep=arg1; >> >> x2=arg2; >> >> x3=arg3; >> >> } >> >> >> >> public static class CM extends Mapper<LongWritable, Text, Text, Text>{ >> >> private Text keyvar = new Text(); >> >> private Text valvar = new Text(); >> >> public void map(LongWritable key, Text value, Context context) >> >> throws IOException, InterruptedException { >> >> String line = value.toString(); >> >> try{ >> >> String data[] = line.split(sep); >> >> keyvar.set(data[0]); >> >> valvar.set(data[1]); >> >> context.write(keyvar,valvar); >> >> } catch (Exception e) { >> >> return; >> >> } >> >> } >> >> } >> >> >> >> public void run(){ >> >> Configuration conf = new Configuration(); >> >> Job job = null; >> >> try { >> >> job = new Job(conf); >> >> } catch (IOException e1) { >> >> // TODO Auto-generated catch block >> >> e1.printStackTrace(); >> >> } >> >> >> >> job.setJobName("XXXJob"); >> >> job.setJarByClass(CMR.class); >> >> >> >> job.setOutputKeyClass(Text.class); >> >> job.setOutputValueClass(Text.class); >> >> >> >> job.setMapperClass(CM.class); >> >> >> >> job.setInputFormatClass(TextInputFormat.class); >> >> job.setOutputFormatClass(TextOutputFormat.class); >> >> >> >> try { >> >> FileInputFormat.addInputPath(job, new Path(x2)); >> >> } catch (IOException e) { >> >> // TODO Auto-generated catch block >> >> e.printStackTrace(); >> >> } >> >> FileOutputFormat.setOutputPath(job, new Path(x3)); >> >> >> >> try { >> >> job.submit(); >> >> } catch (IOException e) { >> >> // TODO Auto-generated catch block >> >> e.printStackTrace(); >> >> } catch (InterruptedException e) { >> >> // TODO Auto-generated catch block >> >> e.printStackTrace(); >> >> } catch (ClassNotFoundException e) { >> >> // TODO Auto-generated catch block >> >> e.printStackTrace(); >> >> } >> >> >> >> } >> >> >> >> >> >> public static void main(String args[]){ >> >> LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]); >> >> LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]); >> >> LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]); >> >> lpa1.start(); >> >> lpa2.start(); >> >> lpa3.start(); >> >> } >> >> } >> > >> > >> >> >> >> -- >> Harsh J >> > >