Thanks Sriram, Many thanks for your prompt reply. I appreciate you inputs.
I am trying to compare two files file1 (3MB) and file2 which is (17G). I need to read each line from file1 and look for it occurrences in file2 and get the frequencies. I am new to hadoop, so am not sure if my approach is correct. Because of the size of the files, I couldn't process it faster suing Java, so I decided to try it using Hadoop. And from what I read and learnt from searching over internet and tutorials, if two files needs to be compared then storing the smaller one in distributed cache is one approach to go about it. For instance: http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata Am I wrong? SS On Fri, Jul 20, 2012 at 9:18 AM, Sriram Ramachandrasekaran < sri.ram...@gmail.com> wrote: > Hello, > If I understand right, you are trying to run your map reduce on files that > you shared via Distributed cache. Distributed Cache is not generally meant > for it. It is available in case your MR needs other reference files, some > archives, native libs, etc that needs to be shared across your cluster. The > suggestion would be to load the file that you want to run your word > frequencies on, to a HDFS file path and then read the job off it. You will > need to configure your MR job to pick input files from your new HDFS > location. > > This would be my approach. Other regulars in the forum will be better able > to help you! > > -Sriram > > > On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita < > shanu.sushmit...@gmail.com> wrote: > >> Hi, >> >> I am trying to solve a problem where I need to computed frequencies of >> words occurring in a file1 from file 2. >> >> For example: >> text in file1: >> hadoop >> user >> hello >> world >> >> and text in file2 is: >> hadoop >> user >> hello >> world >> hadoop >> hadoop >> hadoop >> user >> world >> world >> world >> hadoop >> user >> hello >> >> so the output should be: >> hadoop 5 >> user 3 >> hello 2 >> world 4 >> >> I read that distributed caching is a good way to do such jobs.Size of my >> files are: >> File1 = 17GB >> File2 = 3 MB >> >> And here is my code: >> >> >> >> >> import org.apache.hadoop.fs.*; >> import org.apache.hadoop.conf.*; >> import org.apache.hadoop.io.*; >> import org.apache.hadoop.mapred.*; >> import org.apache.hadoop.util.*; >> import java.util.*; >> import java.io.*; >> import org.apache.hadoop.filecache.*; >> import java.net.*; >> >> public class RepeatFreqTest >> { >> >> >> public static class Map extends MapReduceBase implements >> Mapper<LongWritable, Text, Text, IntWritable> { >> private final static IntWritable one = new IntWritable(1); >> private Text word = new Text(); >> private HashSet<String> dyads = new HashSet<String>(); >> >> >> public void configure(JobConf conf) >> { >> >> >> Path[] cacheFiles = new Path[0]; >> try { >> >> >> cacheFiles = DistributedCache.**getLocalCacheFiles(conf); >> } // end of try >> >> catch (IOException ioe) { >> System.err.println("Caught exception while getting cached >> files: " + StringUtils.**stringifyException(ioe)); >> }// end of catch >> >> for (Path dyadPath : cacheFiles) >> { >> >> loadDyads(dyadPath); >> } // end of for cachePath >> >> >> } // end of configure >> >> private void loadDyads(Path dyadPath) >> { >> try >> { >> BufferedReader wordReader = new BufferedReader(new >> FileReader(dyadPath.toString()**)); >> String line = null; >> while ((line = wordReader.readLine()) != null) { >> dyads.add(line); >> >> } // end of while >> >> wordReader.close(); >> >> }// end of try >> catch (IOException ioe) { >> System.err.println("**IOException reading from distributed cache"); >> } // end of catch >> >> }// end of loadDyads() >> >> >> >> /* actual map() method, etc go here */ >> >> >> @Override >> public void map(LongWritable key, Text value, >> OutputCollector<Text, IntWritable> output, Reporter reporter) throws >> IOException >> { >> >> >> // dyad, ut and year from all dyads (big file) >> file!!! >> >> >> String line = value.toString(); >> String[] tokens = line.split("\\|"); >> String ut1 = tokens[0].trim(); >> String dyad1 = tokens[1].trim(); >> String year1 = tokens[2].trim(); >> int y1 = Integer.parseInt(year1); >> >> >> // dyad, ut and year from sample dyads file >> (sample file stored in the memory)!!! >> >> >> Iterator it = dyads.iterator(); >> while(it.hasNext()) >> { >> //Text word = new Text(); >> String setline = it.next().toString(); >> >> String[] tokens2 = setline.split("\\|"); >> String ut2 = tokens2[0].trim(); >> String dyad2 = tokens2[1].trim(); >> String year2 = tokens2[2].trim(); >> int y2 = Integer.parseInt(year2); >> >> >> >> >> if(dyad1.equalsIgnoreCase(**dyad2)) >> { >> if(!(ut1.equalsIgnoreCase(ut2)** >> )) >> { >> if(y1<=y2) >> { >> >> word.set(setline); >> >> output.collect(word, one); >> } >> >> } // end of if ut1!=ut2 >> >> } // >> >> >> }// end of while >> >> >> } // end of override map >> } // end of big Map class >> >> >> public static class Reduce extends MapReduceBase implements Reducer<Text, >> IntWritable, Text, IntWritable> { >> >> @Override >> public void reduce(Text key, Iterator<IntWritable> values, >> OutputCollector<Text, IntWritable> output, Reporter reporter) throws >> IOException { >> int sum = 0; >> while (values.hasNext()) { >> sum += values.next().get(); >> >> } // end of while >> output.collect(key, new IntWritable(sum)); >> } // end of override reduce >> } // end of Big Reduce >> >> >> public static void main(String[] args) throws Exception { >> >> JobConf conf = new JobConf(RepeatFreqTest.class); >> conf.setJobName("Repeat"); >> >> conf.setOutputKeyClass(Text.**class); >> conf.setOutputValueClass(**IntWritable.class); >> >> conf.setMapperClass(Map.class)**; >> conf.setCombinerClass(Reduce.**class); >> conf.setReducerClass(Reduce.**class); >> >> conf.setInputFormat(**TextInputFormat.class); >> conf.setOutputFormat(**TextOutputFormat.class); >> >> FileInputFormat.setInputPaths(**conf, new Path(args[0])); >> FileOutputFormat.**setOutputPath(conf, new Path(args[1])); >> >> >> >> DistributedCache.addCacheFile(**new >> Path("/user/ss/cacheFiles/**File1.txt").toUri(), >> conf); >> >> JobClient.runJob(conf); >> >> }// end of main >> >> } // end of class >> >> >> And I put my File1.txt and File2.txt in hdfs as follows: >> >> $HADOOP_HOME/bin/hadoop fs -mkdir input >> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles >> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input >> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles >> >> My problem is that my code compiles fine, but it would just not proceed >> from map 0% reduce 0% stage. >> >> What I am doing wrong? >> >> Any suggestion would be of great help. >> >> Best, >> SS >> > > > > -- > It's just about how deep your longing is! > >