Hi,
I am trying to solve a problem where I need to computed frequencies of
words occurring in a file1 from file 2.
For example:
text in file1:
hadoop
user
hello
world
and text in file2 is:
hadoop
user
hello
world
hadoop
hadoop
hadoop
user
world
world
world
hadoop
user
hello
so the output should be:
hadoop 5
user 3
hello 2
world 4
I read that distributed caching is a good way to do such jobs.Size of
my files are:
File1 = 17GB
File2 = 3 MB
And here is my code:
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.util.*;
import java.io.*;
import org.apache.hadoop.filecache.*;
import java.net.*;
public class RepeatFreqTest
{
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private HashSet<String> dyads = new HashSet<String>();
public void configure(JobConf conf)
{
Path[] cacheFiles = new Path[0];
try {
cacheFiles = DistributedCache.getLocalCacheFiles(conf);
} // end of try
catch (IOException ioe) {
System.err.println("Caught exception while getting cached
files: " + StringUtils.stringifyException(ioe));
}// end of catch
for (Path dyadPath : cacheFiles)
{
loadDyads(dyadPath);
} // end of for cachePath
} // end of configure
private void loadDyads(Path dyadPath)
{
try
{
BufferedReader wordReader = new BufferedReader(new
FileReader(dyadPath.toString()));
String line = null;
while ((line = wordReader.readLine()) != null) {
dyads.add(line);
} // end of while
wordReader.close();
}// end of try
catch (IOException ioe) {
System.err.println("IOException reading from distributed cache");
} // end of catch
}// end of loadDyads()
/* actual map() method, etc go here */
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException
{
// dyad, ut and year from all dyads (big
file) file!!!
String line = value.toString();
String[] tokens = line.split("\\|");
String ut1 = tokens[0].trim();
String dyad1 = tokens[1].trim();
String year1 = tokens[2].trim();
int y1 = Integer.parseInt(year1);
// dyad, ut and year from sample dyads file
(sample file stored in the memory)!!!
Iterator it = dyads.iterator();
while(it.hasNext())
{
//Text word = new Text();
String setline = it.next().toString();
String[] tokens2 = setline.split("\
\|");
String ut2 = tokens2[0].trim();
String dyad2 = tokens2[1].trim();
String year2 = tokens2[2].trim();
int y2 = Integer.parseInt(year2);
if(dyad1.equalsIgnoreCase(dyad2))
{
if(!
(ut1.equalsIgnoreCase(ut2)))
{
if(y1<=y2)
{
word.set(setline);
output.collect(word, one);
}
} // end of if ut1!=ut2
} //
}// end of while
} // end of override map
} // end of big Map class
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable>
values, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
} // end of while
output.collect(key, new IntWritable(sum));
} // end of override reduce
} // end of Big Reduce
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(RepeatFreqTest.class);
conf.setJobName("Repeat");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
DistributedCache.addCacheFile(new Path("/user/ss/cacheFiles/
File1.txt").toUri(), conf);
JobClient.runJob(conf);
}// end of main
} // end of class
And I put my File1.txt and File2.txt in hdfs as follows:
$HADOOP_HOME/bin/hadoop fs -mkdir input
$HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
$HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
$HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
My problem is that my code compiles fine, but it would just not
proceed from map 0% reduce 0% stage.
What I am doing wrong?
Any suggestion would be of great help.
Best,
SS