I think it won't help much, since in a hadoop cluster, people already allocate "SLOTS" to be the number of cores, supposedly the inherent parallelism can be already exploited, since different mappers/reducers are completely independent.
On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <clouder...@gmail.com> wrote: > Dears, > > I suddenly got this idea to do mapreduce job in a muti-thread way. > I don't know if it can work. Could anyone give me some advices? > Here is the java code: > > > import java.io.IOException; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.Mapper; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > > public class LogProcessApp extends Thread { > > private static String sep; > private String x2; > private String x3; > > public LogProcessApp(String arg1,String arg2,String arg3){ > sep=arg1; > x2=arg2; > x3=arg3; > } > > public static class CM extends Mapper<LongWritable, Text, Text, Text>{ > private Text keyvar = new Text(); > private Text valvar = new Text(); > public void map(LongWritable key, Text value, Context context) > throws IOException, InterruptedException { > String line = value.toString(); > try{ > String data[] = line.split(sep); > keyvar.set(data[0]); > valvar.set(data[1]); > context.write(keyvar,valvar); > } catch (Exception e) { > return; > } > } > } > > public void run(){ > Configuration conf = new Configuration(); > Job job = null; > try { > job = new Job(conf); > } catch (IOException e1) { > // TODO Auto-generated catch block > e1.printStackTrace(); > } > > job.setJobName("XXXJob"); > job.setJarByClass(CMR.class); > > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(Text.class); > > job.setMapperClass(CM.class); > > job.setInputFormatClass(TextInputFormat.class); > job.setOutputFormatClass(TextOutputFormat.class); > > try { > FileInputFormat.addInputPath(job, new Path(x2)); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > FileOutputFormat.setOutputPath(job, new Path(x3)); > > try { > job.submit(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (InterruptedException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } catch (ClassNotFoundException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > } > > > public static void main(String args[]){ > LogProcessApp lpa1=new LogProcessApp(args[0],args[1],args[3]); > LogProcessApp lpa2=new LogProcessApp(args[4],args[5],args[6]); > LogProcessApp lpa3=new LogProcessApp(args[7],args[8],args[9]); > lpa1.start(); > lpa2.start(); > lpa3.start(); > } > } >