I suppose you could also leverage job configuration or per-input mapper impl. via MultipleInputs to do this.
On Thu, Dec 13, 2012 at 5:44 PM, Yu Yang <clouder...@gmail.com> wrote: > Thank you all. In fact, I don't expect that this way can help to enhance > the performance. > I need to process 3 different logs (with different format). I just want to > sart all these 3 logs processing at the same time , all in just this one > program. but I can give different separator to each thread to create maps > to handle different logs. > > > 2012/12/13 Yang <teddyyyy...@gmail.com> >> >> but I do have run across some situations where I could benefit from >> multi-threading: if your hadoop mapper is prone to random access IO (such as >> looking up a TFile, or HBase, which ultimately makes a network call and then >> looks into a file segment), having multiple threads could utilize the CPU >> while IO is going on >> >> >> On Wed, Dec 12, 2012 at 10:47 AM, Harsh J <ha...@cloudera.com> wrote: >>> >>> Exactly - A job is already designed to be properly parallel w.r.t. its >>> input, and this would just add additional overheads of job setup and >>> scheduling. If your per-record processing requires threaded work, >>> consider using the MultithreadedMapper/Reducer classes instead. >>> >>> On Wed, Dec 12, 2012 at 10:53 PM, Yang <teddyyyy...@gmail.com> wrote: >>> > I think it won't help much, since in a hadoop cluster, people already >>> > allocate "SLOTS" to be the number of cores, supposedly the inherent >>> > parallelism can be already exploited, since different mappers/reducers >>> > are >>> > completely independent. >>> > >>> > >>> > On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote: >>> >> >>> >> Dears, >>> >> >>> >> I suddenly got this idea to do mapreduce job in a muti-thread way. >>> >> I don't know if it can work. Could anyone give me some advices? >>> >> Here is the java code: >>> >> >>> >> >>> >> import java.io.IOException; >>> >> import org.apache.hadoop.conf.Configuration; >>> >> import org.apache.hadoop.fs.Path; >>> >> import org.apache.hadoop.io.LongWritable; >>> >> import org.apache.hadoop.io.Text; >>> >> import org.apache.hadoop.mapreduce.Job; >>> >> import org.apache.hadoop.mapreduce.Mapper; >>> >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; >>> >> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; >>> >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >>> >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; >>> >> >>> >> public class LogProcessApp extends Thread { >>> >> >>> >> private static String sep; >>> >> private String x2; >>> >> private String x3; >>> >> >>> >> public LogProcessApp(String arg1,String arg2,String arg3){ >>> >> sep=arg1; >>> >> x2=arg2; >>> >> x3=arg3; >>> >> } >>> >> >>> >> public static class CM extends Mapper<LongWritable, Text, Text, >>> >> Text>{ >>> >> private Text keyvar = new Text(); >>> >> private Text valvar = new Text(); >>> >> public void map(LongWritable key, Text value, Context context) >>> >> throws IOException, InterruptedException { >>> >> String line = value.toString(); >>> >> try{ >>> >> String data[] = line.split(sep); >>> >> keyvar.set(data[0]); >>> >> valvar.set(data[1]); >>> >> context.write(keyvar,valvar); >>> >> } catch (Exception e) { >>> >> return; >>> >> } >>> >> } >>> >> } >>> >> >>> >> public void run(){ >>> >> Configuration conf = new Configuration(); >>> >> Job job = null; >>> >> try { >>> >> job = new Job(conf); >>> >> } catch (IOException e1) { >>> >> // TODO Auto-generated catch block >>> >> e1.printStackTrace(); >>> >> } >>> >> >>> >> job.setJobName("XXXJob"); >>> >> job.setJarByClass(CMR.class); >>> >> >>> >> job.setOutputKeyClass(Text.class); >>> >> job.setOutputValueClass(Text.class); >>> >> >>> >> job.setMapperClass(CM.class); >>> >> >>> >> job.setInputFormatClass(TextInputFormat.class); >>> >> job.setOutputFormatClass(TextOutputFormat.class); >>> >> >>> >> try { >>> >> FileInputFormat.addInputPath(job, new Path(x2)); >>> >> } catch (IOException e) { >>> >> // TODO Auto-generated catch block >>> >> e.printStackTrace(); >>> >> } >>> >> FileOutputFormat.setOutputPath(job, new Path(x3)); >>> >> >>> >> try { >>> >> job.submit(); >>> >> } catch (IOException e) { >>> >> // TODO Auto-generated catch block >>> >> e.printStackTrace(); >>> >> } catch (InterruptedException e) { >>> >> // TODO Auto-generated catch block >>> >> e.printStackTrace(); >>> >> } catch (ClassNotFoundException e) { >>> >> // TODO Auto-generated catch block >>> >> e.printStackTrace(); >>> >> } >>> >> >>> >> } >>> >> >>> >> >>> >> public static void main(String args[]){ >>> >> LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]); >>> >> LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]); >>> >> LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]); >>> >> lpa1.start(); >>> >> lpa2.start(); >>> >> lpa3.start(); >>> >> } >>> >> } >>> > >>> > >>> >>> >>> >>> -- >>> Harsh J >> >> > -- Harsh J