Hi All,

What would be the right approach to solve this problem:
        1. I need to output an object as the value from my map method. The 
object's 
class should have two mambers: an ArrayList<String> and another, an integer.

I used following two ways, but they are not working:
        * I wrote a class MyCompositeValueWritable that implements Writable 
interface.  
Inside the overridden readFields and write methods, I try to read/write using 
the ObjectWritable class. [see attached file MyWordCount_ObjVal1_2.java]

        * The custom class is a plain class 'MyCompositeValue' not implementing 
or 
inheriting anything. The Map and Reduce methods try to output the <key, 
value=<object of MyCompositeValue> > using the ObjectWritable class. [see 
attached file Case2.txt] 

        * Am I going wrong somewhere? Appreciate any help.

        2. I have another problem, in which I need two types of mappers and 
reducer, 
and I want to execute them in this order:
        * Mapper1 -> Reducer1 -> Mapper2 -> Reducer2
        * Is it possible through ChainMapper and/or ChainReducer classes? It 
yes, then 
how? Can anybody provide some starting working example, or point me to some 
good 
url for the same?
        * Currently, I am doing it as a work-around:
        * The first set of Mapper-Reducer write to HDFS. Then the second set of 
Mapper-Reducer pick up that output file from HDFS and writes further processed 
output to another HDFS directory.
        * An example would be really really  helpful.
Thanks
Bhaskar Ghosh

"Ignorance is Bliss... Knowledge never brings Peace!!!"

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Bhaskar Ghosh ([email protected])
 */
public class MyWordCount_ObjVal1_2 {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, MyCompositeValueWritable>{
    
    private Text word = new Text();
    
        private String localname;

        @SuppressWarnings("deprecation")
        public void setup(Context context) throws InterruptedException,
                        IOException {
                super.setup(context);

                localname = ((FileSplit) 
context.getInputSplit()).getPath().getName();
        }

        
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        MyCompositeValueWritable compositeValue = new 
MyCompositeValueWritable();
        compositeValue.addToList(localname);
        compositeValue.setValue(1);
        context.write (word, compositeValue);
      }
    }
  }
  
  public static class IntSumReducer 
       extends 
Reducer<Text,MyCompositeValueWritable,Text,MyCompositeValueWritable> {

    public void reduce(Text key, Iterable<MyCompositeValueWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      MyCompositeValueWritable compositeValue = new MyCompositeValueWritable();
      
      for (MyCompositeValueWritable compositeValObj : values) {
              if(compositeValObj instanceof MyCompositeValueWritable)
                      compositeValue = (MyCompositeValueWritable) 
compositeValObj;
              else
                      System.out.println("Object obtained as i/p to Reducer is 
not MyCompositeValueWritable!");
              System.out.println(compositeValue);
              ArrayList<String> listOfFiles = compositeValue.getList();
              compositeValue.mergeUnique(listOfFiles);
              int occur = compositeValue.getValue();
              sum += occur;
      }
      compositeValue.setValue(sum);
      context.write(key, compositeValue);
    }
  }

  public static class MyCompositeValueWritable implements Writable
  {
          private ArrayList<String> listOfString;
          private int value;
          
          public MyCompositeValueWritable()
          {
                  listOfString = new ArrayList<String>(1);
                  value = 0;
          }
          
          public void set(ArrayList<String> list, int val)
          {
                  listOfString = list;
                  value = val;
          }
          
                @Override
                public void readFields(DataInput in) throws IOException {
                        // TODO Auto-generated method stub
                        ObjectWritable list = new 
ObjectWritable(ArrayList.class, listOfString);
                        list.readFields(in);
                        value = in.readInt();
                }

                @Override
                public void write(DataOutput out) throws IOException {
                        // TODO Auto-generated method stub
                        ObjectWritable list = new 
ObjectWritable(ArrayList.class, listOfString);
                        list.write(out);
                        out.write(value);
                }

          
          /**
           * Adds the String str passed as parameter to the object's internal 
ArrayList member. But, it ensures that
           *  Strings in the list do not repeat.
           * So if the passed String exits it does not add, else it adds.
           * @param str
           * @return
           */
          public boolean addToList(String str)
          {
                  if(listOfString == null)
                                  return false;
                  
                  if(listOfString.contains(str))
                          return false;
                  else
                          listOfString.add(str);
                  
                  return true;
          }
          
          public void setValue(int val)
          {
                  value = val;
          }
          
          public ArrayList<String> getList()
          {
                  return listOfString;
          }
          
          public int getValue()
          {
                  return value;
          }
          
          /**
           * Merges the list passed as parameter to the object's internal 
ArrayList member. Also ensures that
           *  Strings in the list do not repeat
           * @param list
           */
          public ArrayList<String> mergeUnique(ArrayList<String> list)
          {
                  if(list != null)
                  {  
                          if(listOfString == null)
                                  listOfString = new ArrayList<String>();
                          
                          for(int i=0; i < list.size(); i++)
                          {
                                  addToList(list.get(i));                       
   
                          }
                  }
                  
                  return this.listOfString;
          }
          
          
          public String toString()
          {
                  StringBuilder returnStrBld = new StringBuilder();
                  returnStrBld.append("<<");
                        for(int i=0; i < listOfString.size(); i++)
                        {
                                returnStrBld.append(listOfString.get(i));
                                if(i == listOfString.size() - 1)
                                {
                                        returnStrBld.append(">");
                                }
                                else
                                {
                                        returnStrBld.append(",");
                                }
                        }
                        returnStrBld.append(" , " + value + ">");
                        
                        return returnStrBld.toString();
          }

          
  }
  
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new String[]{"Input_Question_3", "Output_Question_3"};
    
    Job job = new Job(conf, "word count");
    job.setJarByClass(MyWordCount_ObjVal1_2.class);
    job.setMapperClass(TokenizerMapper.class);
//    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(MyCompositeValueWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Bhaskar Ghosh ([email protected])
 */
public class MyWordCount_ObjVal2 {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, ObjectWritable>{
    
    private final static ObjectWritable compositeValObj = new ObjectWritable();
    private Text word = new Text();
    
        private String localname;

        @SuppressWarnings("deprecation")
        public void setup(Context context) throws InterruptedException,
                        IOException {
                super.setup(context);

                localname = ((FileSplit) 
context.getInputSplit()).getPath().getName();
        }

        
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        MyCompositeValue compositeValue = new MyCompositeValue();
        compositeValue.addToList(localname);
        compositeValue.setValue(1);
        compositeValObj.setConf(context.getConfiguration());
        compositeValObj.set(compositeValue);
        context.write(word, compositeValObj);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,ObjectWritable,Text,ObjectWritable> {
    private ObjectWritable compositeValObj = new ObjectWritable();

    public void reduce(Text key, Iterable<ObjectWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      MyCompositeValue compositeValue = new MyCompositeValue();
      
      for (ObjectWritable compositeValObj : values) {
              compositeValue = (MyCompositeValue) compositeValObj.get();
              ArrayList<String> listOfFiles = compositeValue.getList();
              compositeValue.mergeUnique(listOfFiles);
              int occur = compositeValue.getValue();
              sum += occur;
      }
      compositeValue.setValue(sum);
      compositeValObj.setConf(context.getConfiguration());
      compositeValObj.set(compositeValue);
      context.write(key, compositeValObj);
    }
  }

  public static class MyCompositeValue
  {
          private ArrayList<String> listOfString;
          private int value;
          
          public MyCompositeValue()
          {
                  listOfString = new ArrayList<String>(1);
                  value = 0;
          }
          
          public void set(ArrayList<String> list, int val)
          {
                  listOfString = list;
                  value = val;
          }
          
          /**
           * Adds the String str passed as parameter to the object's internal 
ArrayList member. But, it ensures that
           *  Strings in the list do not repeat.
           * So if the passed String exits it does not add, else it adds.
           * @param str
           * @return
           */
          public boolean addToList(String str)
          {
                  if(listOfString == null)
                                  return false;
                  
                  if(listOfString.contains(str))
                          return false;
                  else
                          listOfString.add(str);
                  
                  return true;
          }
          
          public void setValue(int val)
          {
                  value = val;
          }
          
          public ArrayList<String> getList()
          {
                  return listOfString;
          }
          
          public int getValue()
          {
                  return value;
          }
          
          /**
           * Merges the list passed as parameter to the object's internal 
ArrayList member. Also ensures that
           *  Strings in the list do not repeat
           * @param list
           */
          public ArrayList<String> mergeUnique(ArrayList<String> list)
          {
                  if(list != null)
                  {  
                          if(listOfString == null)
                                  listOfString = new ArrayList<String>();
                          
                          for(int i=0; i < list.size(); i++)
                          {
                                  addToList(list.get(i));                       
   
                          }
                  }
                  
                  return this.listOfString;
          }
  }
  
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new String[]{"Input_Question_3", "Output_Question_3"};
    
    Job job = new Job(conf, "word count");
    job.setJarByClass(MyWordCount_ObjVal2.class);
    job.setMapperClass(TokenizerMapper.class);
    //job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(ObjectWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Reply via email to