Hi,

I am trying to solve a problem where I need to computed frequencies of words occurring in a file1 from file 2.

For example:
text in file1:
hadoop
user
hello
world

and text in file2 is:
hadoop
user
hello
world
hadoop
hadoop
hadoop
user
world
world
world
hadoop
user
hello

so the output should be:
hadoop 5
user 3
hello 2
world 4

I read that distributed caching is a good way to do such jobs.Size of my files are:
File1 = 17GB
File2 = 3 MB

And here is my code:




import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.util.*;
import java.io.*;
import org.apache.hadoop.filecache.*;
import java.net.*;

public class RepeatFreqTest
{


public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
             private HashSet<String> dyads = new HashSet<String>();


  public void configure(JobConf conf)
  {


 Path[] cacheFiles = new Path[0];
   try {


  cacheFiles = DistributedCache.getLocalCacheFiles(conf);
  } // end of try

  catch (IOException ioe) {
System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
   }// end of catch

  for (Path dyadPath : cacheFiles)
  {

        loadDyads(dyadPath);
  } // end of for cachePath


  } // end of configure

private void loadDyads(Path dyadPath)
{
try
{
BufferedReader wordReader = new BufferedReader(new FileReader(dyadPath.toString()));
                String line = null;
                while ((line = wordReader.readLine()) != null) {
       dyads.add(line);

      } // end of while

      wordReader.close();

}// end of try
 catch (IOException ioe) {
      System.err.println("IOException reading from distributed cache");
      } // end of catch

}// end of loadDyads()



  /* actual map() method, etc go here */


                        @Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
             {


// dyad, ut and year from all dyads (big file) file!!!


                        String line = value.toString();
                        String[] tokens = line.split("\\|");
                        String ut1 = tokens[0].trim();
                        String dyad1 = tokens[1].trim();
                        String year1 = tokens[2].trim();
                        int y1 = Integer.parseInt(year1);


// dyad, ut and year from sample dyads file (sample file stored in the memory)!!!


                        Iterator it = dyads.iterator();
                        while(it.hasNext())
                        {
                                //Text word = new Text();
                                String setline = it.next().toString();

String[] tokens2 = setline.split("\ \|");
                                String ut2 = tokens2[0].trim();
                                String dyad2 = tokens2[1].trim();
                                String year2 = tokens2[2].trim();
                                int y2 = Integer.parseInt(year2);




                                if(dyad1.equalsIgnoreCase(dyad2))
                                {
if(! (ut1.equalsIgnoreCase(ut2)))
                                        {
                                                if(y1<=y2)
                                                {

word.set(setline); output.collect(word, one);
                                                }

                                        } // end of if ut1!=ut2

                                } //


                        }// end of while


} // end of override map
} // end of big Map class


public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

 @Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
               int sum = 0;
               while (values.hasNext()) {
                 sum += values.next().get();

               } // end of while
               output.collect(key, new IntWritable(sum));
             } // end of override reduce
           } // end of Big Reduce


public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(RepeatFreqTest.class);
conf.setJobName("Repeat");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));



DistributedCache.addCacheFile(new Path("/user/ss/cacheFiles/ File1.txt").toUri(), conf);

 JobClient.runJob(conf);

}// end of main

} // end of class


And I put my File1.txt and File2.txt in hdfs as follows:

$HADOOP_HOME/bin/hadoop fs -mkdir input
$HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
$HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
$HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles

My problem is that my code compiles fine, but it would just not proceed from map 0% reduce 0% stage.

What I am doing wrong?

Any suggestion would be of great help.

Best,
SS

Reply via email to