Hi All,
What would be the right approach to solve this problem:
1. I need to output an object as the value from my map method. The
object's
class should have two mambers: an ArrayList<String> and another, an integer.
I used following two ways, but they are not working:
* I wrote a class MyCompositeValueWritable that implements Writable
interface.
Inside the overridden readFields and write methods, I try to read/write using
the ObjectWritable class. [see attached file MyWordCount_ObjVal1_2.java]
* The custom class is a plain class 'MyCompositeValue' not implementing
or
inheriting anything. The Map and Reduce methods try to output the <key,
value=<object of MyCompositeValue> > using the ObjectWritable class. [see
attached file Case2.txt]
* Am I going wrong somewhere? Appreciate any help.
2. I have another problem, in which I need two types of mappers and
reducer,
and I want to execute them in this order:
* Mapper1 -> Reducer1 -> Mapper2 -> Reducer2
* Is it possible through ChainMapper and/or ChainReducer classes? It
yes, then
how? Can anybody provide some starting working example, or point me to some
good
url for the same?
* Currently, I am doing it as a work-around:
* The first set of Mapper-Reducer write to HDFS. Then the second set of
Mapper-Reducer pick up that output file from HDFS and writes further processed
output to another HDFS directory.
* An example would be really really helpful.
Thanks
Bhaskar Ghosh
"Ignorance is Bliss... Knowledge never brings Peace!!!"
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author Bhaskar Ghosh ([email protected])
*/
public class MyWordCount_ObjVal1_2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, MyCompositeValueWritable>{
private Text word = new Text();
private String localname;
@SuppressWarnings("deprecation")
public void setup(Context context) throws InterruptedException,
IOException {
super.setup(context);
localname = ((FileSplit)
context.getInputSplit()).getPath().getName();
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
MyCompositeValueWritable compositeValue = new
MyCompositeValueWritable();
compositeValue.addToList(localname);
compositeValue.setValue(1);
context.write (word, compositeValue);
}
}
}
public static class IntSumReducer
extends
Reducer<Text,MyCompositeValueWritable,Text,MyCompositeValueWritable> {
public void reduce(Text key, Iterable<MyCompositeValueWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
MyCompositeValueWritable compositeValue = new MyCompositeValueWritable();
for (MyCompositeValueWritable compositeValObj : values) {
if(compositeValObj instanceof MyCompositeValueWritable)
compositeValue = (MyCompositeValueWritable)
compositeValObj;
else
System.out.println("Object obtained as i/p to Reducer is
not MyCompositeValueWritable!");
System.out.println(compositeValue);
ArrayList<String> listOfFiles = compositeValue.getList();
compositeValue.mergeUnique(listOfFiles);
int occur = compositeValue.getValue();
sum += occur;
}
compositeValue.setValue(sum);
context.write(key, compositeValue);
}
}
public static class MyCompositeValueWritable implements Writable
{
private ArrayList<String> listOfString;
private int value;
public MyCompositeValueWritable()
{
listOfString = new ArrayList<String>(1);
value = 0;
}
public void set(ArrayList<String> list, int val)
{
listOfString = list;
value = val;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
ObjectWritable list = new
ObjectWritable(ArrayList.class, listOfString);
list.readFields(in);
value = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
ObjectWritable list = new
ObjectWritable(ArrayList.class, listOfString);
list.write(out);
out.write(value);
}
/**
* Adds the String str passed as parameter to the object's internal
ArrayList member. But, it ensures that
* Strings in the list do not repeat.
* So if the passed String exits it does not add, else it adds.
* @param str
* @return
*/
public boolean addToList(String str)
{
if(listOfString == null)
return false;
if(listOfString.contains(str))
return false;
else
listOfString.add(str);
return true;
}
public void setValue(int val)
{
value = val;
}
public ArrayList<String> getList()
{
return listOfString;
}
public int getValue()
{
return value;
}
/**
* Merges the list passed as parameter to the object's internal
ArrayList member. Also ensures that
* Strings in the list do not repeat
* @param list
*/
public ArrayList<String> mergeUnique(ArrayList<String> list)
{
if(list != null)
{
if(listOfString == null)
listOfString = new ArrayList<String>();
for(int i=0; i < list.size(); i++)
{
addToList(list.get(i));
}
}
return this.listOfString;
}
public String toString()
{
StringBuilder returnStrBld = new StringBuilder();
returnStrBld.append("<<");
for(int i=0; i < listOfString.size(); i++)
{
returnStrBld.append(listOfString.get(i));
if(i == listOfString.size() - 1)
{
returnStrBld.append(">");
}
else
{
returnStrBld.append(",");
}
}
returnStrBld.append(" , " + value + ">");
return returnStrBld.toString();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new String[]{"Input_Question_3", "Output_Question_3"};
Job job = new Job(conf, "word count");
job.setJarByClass(MyWordCount_ObjVal1_2.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyCompositeValueWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author Bhaskar Ghosh ([email protected])
*/
public class MyWordCount_ObjVal2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, ObjectWritable>{
private final static ObjectWritable compositeValObj = new ObjectWritable();
private Text word = new Text();
private String localname;
@SuppressWarnings("deprecation")
public void setup(Context context) throws InterruptedException,
IOException {
super.setup(context);
localname = ((FileSplit)
context.getInputSplit()).getPath().getName();
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
MyCompositeValue compositeValue = new MyCompositeValue();
compositeValue.addToList(localname);
compositeValue.setValue(1);
compositeValObj.setConf(context.getConfiguration());
compositeValObj.set(compositeValue);
context.write(word, compositeValObj);
}
}
}
public static class IntSumReducer
extends Reducer<Text,ObjectWritable,Text,ObjectWritable> {
private ObjectWritable compositeValObj = new ObjectWritable();
public void reduce(Text key, Iterable<ObjectWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
MyCompositeValue compositeValue = new MyCompositeValue();
for (ObjectWritable compositeValObj : values) {
compositeValue = (MyCompositeValue) compositeValObj.get();
ArrayList<String> listOfFiles = compositeValue.getList();
compositeValue.mergeUnique(listOfFiles);
int occur = compositeValue.getValue();
sum += occur;
}
compositeValue.setValue(sum);
compositeValObj.setConf(context.getConfiguration());
compositeValObj.set(compositeValue);
context.write(key, compositeValObj);
}
}
public static class MyCompositeValue
{
private ArrayList<String> listOfString;
private int value;
public MyCompositeValue()
{
listOfString = new ArrayList<String>(1);
value = 0;
}
public void set(ArrayList<String> list, int val)
{
listOfString = list;
value = val;
}
/**
* Adds the String str passed as parameter to the object's internal
ArrayList member. But, it ensures that
* Strings in the list do not repeat.
* So if the passed String exits it does not add, else it adds.
* @param str
* @return
*/
public boolean addToList(String str)
{
if(listOfString == null)
return false;
if(listOfString.contains(str))
return false;
else
listOfString.add(str);
return true;
}
public void setValue(int val)
{
value = val;
}
public ArrayList<String> getList()
{
return listOfString;
}
public int getValue()
{
return value;
}
/**
* Merges the list passed as parameter to the object's internal
ArrayList member. Also ensures that
* Strings in the list do not repeat
* @param list
*/
public ArrayList<String> mergeUnique(ArrayList<String> list)
{
if(list != null)
{
if(listOfString == null)
listOfString = new ArrayList<String>();
for(int i=0; i < list.size(); i++)
{
addToList(list.get(i));
}
}
return this.listOfString;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new String[]{"Input_Question_3", "Output_Question_3"};
Job job = new Job(conf, "word count");
job.setJarByClass(MyWordCount_ObjVal2.class);
job.setMapperClass(TokenizerMapper.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ObjectWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}