---------- Forwarded message ----------
From: Vikas Jadhav <[email protected]>
Date: Thu, Jan 31, 2013 at 11:14 PM
Subject: Re: Issue with Reduce Side join using datajoin package
To: [email protected]
***************source ****************
public class MyJoin extends Configured implements Tool {
public static class MapClass extends DataJoinMapperBase {
protected Text generateInputTag(String inputFile) {
System.out.println("Starting generateInputTag() : "+inputFile);
String datasource = inputFile.split("-")[0];
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
System.out.println(" Statring generateGroupKey() : "+aRecord);
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
System.out.println("starting generateTaggedMapOutput() value
: "+value);
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class Reduce extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
System.out.println("combine :");
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++) {
if (i > 0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable()
{
this.tag = new Text();
}//end empty( taking no parameters) constructor TaggedWritable
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
public Writable getData() {
return data;
}
public void write(DataOutput out) throws IOException {
//System.out.println(");
this.tag.write(out);
this.data.write(out);
System.out.println("Tag :"+tag+" Data :"+ data);
}
/*
public void readFields(DataInput in) throws IOException {
System.out.println(" Starting short readFields(): "+ in);
this.tag.readFields(in);
this.data.readFields(in);
} */
public void readFields(DataInput in) throws IOException {
System.out.println(" Starting short readFields(): "+ in);
this.tag.readFields(in);
String w = in.toString();
if(this.data == null)
try {
this.data =(Writable)
ReflectionUtils.newInstance(Class.forName(w), null);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.data.readFields(in);
}
}
public int run(String[] args) throws Exception {
System.out.println("Starting run() Method:");
Configuration conf = getConf();
conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/core-site.xml"));
conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/mapred-site.xml"));
conf.addResource(new
Path("/home/vikas/project/hadoop-1.0.3/conf/hdfs-site.xml"));
JobConf job = new JobConf(conf, MyJoin.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin_cust X order");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
System.out.println("Starting main() function:");
int res = ToolRunner.run(new Configuration(),
new MyJoin(),
args);
System.exit(res);
}
}
*************************and
error*********************************************
13/01/31 23:04:26 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
13/01/31 23:04:26 WARN snappy.LoadSnappy: Snappy native library not loaded
13/01/31 23:04:26 INFO mapred.FileInputFormat: Total input paths to process
: 2
13/01/31 23:04:26 INFO mapred.JobClient: Running job: job_201301312254_0004
13/01/31 23:04:27 INFO mapred.JobClient: map 0% reduce 0%
13/01/31 23:04:41 INFO mapred.JobClient: map 66% reduce 0%
13/01/31 23:04:47 INFO mapred.JobClient: map 100% reduce 0%
13/01/31 23:04:50 INFO mapred.JobClient: map 100% reduce 22%
13/01/31 23:04:58 INFO mapred.JobClient: Task Id :
attempt_201301312254_0004_r_000000_0, Status : FAILED
java.lang.NullPointerException
at MyJoin$TaggedWritable.readFields(MyJoin.java:125)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
at
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
at
org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271)
at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211)
at
org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:249)
at
org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:245)
at
com.sas.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:107)
at com.sas.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:132)
at
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
--
*
*
*
Thanx and Regards*
* Vikas Jadhav*