Hi again.
I am attempting to compare the strings as JSON objects using hashcodes with
the ultimate goal to remove duplicates.
I've have implemented the following solution.
1. I parse the input line into a JsonElement using the Google JSON parser
(Gson),
2. I take the hash code of the resulting JSONElement. And I use it as the
Key for <Key,Val> output pairs. It seems to work fine.
As I am new to hadoop, I just want to run this by the community. Is there
some reason this wouldn't work?
Thank you very much for your help
For reference, here is my code:
class DupFilter{
private static final JsonParser PARSER = new JsonParser();
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter) throws
IOException{
if(value.equals(null) || value.getLength() == 0)
return;
JsonElement je = PARSER.parse(value.toString());
int hash = je.hashCode();
output.collect(new IntWritable(hash), value);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<IntWritable, Text> output, Reporter reporter) throws
IOException {
output.collect(key, values.next());
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DupFilter.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
On Fri, Jun 7, 2013 at 1:16 PM, Lance Norskog <[email protected]> wrote:
> A side point for Hadoop experts: a comparator is used for sorting in the
> shuffle. If a comparator always returns -1 for unequal objects, then
> sorting will take longer than it should because there will be a certain
> amount of items that are compared more than once.
>
> Is this true?
>
> On 06/05/2013 04:10 PM, Max Lebedev wrote:
>
> I’ve taken your advice and made a wrapper class which implements
> WritableComparable. Thank you very much for your help. I believe everything
> is working fine on that front. I used google’s gson for the comparison.
>
>
> public int compareTo(Object o) {
>
> JsonElement o1 = PARSER.parse(this.json.toString());
>
> JsonElement o2 = PARSER.parse(o.toString());
>
> if(o2.equals(o1))
>
> return 0;
>
> else
>
> return -1;
>
> }
>
>
> The problem I have now is that only consecutive duplicates are detected.
> Given 6 lines:
>
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}
>
>
> {"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}
>
> {"ts":1368758947.291035,
> "source":"sdk","isSecure":false,"version":2,"debug":false}
>
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
>
> I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is
> exactly equal to 1. If I switch 5 and 6, the original line 5 is no longer
> filtered (I get 1,3,4,5,6). I’ve noticed that the compareTo method is
> called a total of 13 times. I assume that in order for all 6 of the keys to
> be compared, 15 comparisons need to be made. Am I missing something here?
> I’ve tested the compareTo manually and line 1 and 6 are interpreted as
> equal. My map reduce code currently looks like this:
>
>
> class DupFilter{
>
> private static final Gson GSON = new Gson();
>
> private static final JsonParser PARSER = new JsonParser();
>
> public static class Map extends MapReduceBase implements
> Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
> public void map(LongWritable key, Text value,
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
> IOException{
>
> JsonElement je = PARSER.parse(value.toString());
>
> JSONWrapper jow = null;
>
> jow = new JSONWrapper(value.toString());
>
> IntWritable one = new IntWritable(1);
>
> output.collect(jow, one);
>
> }
>
> }
>
> public static class Reduce extends MapReduceBase implements
> Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {
>
> public void reduce(JSONWrapper jow, Iterator<IntWritable> values,
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
> IOException {
>
> int sum = 0;
>
> while (values.hasNext())
>
> sum += values.next().get();
>
> output.collect(jow, new IntWritable(sum));
>
> }
>
> }
>
> public static void main(String[] args) throws Exception {
>
> JobConf conf = new JobConf(DupFilter.class);
>
> conf.setJobName("dupfilter");
>
> conf.setOutputKeyClass(JSONWrapper.class);
>
> conf.setOutputValueClass(IntWritable.class);
>
> conf.setMapperClass(Map.class);
>
> conf.setReducerClass(Reduce.class);
>
> conf.setInputFormat(TextInputFormat.class);
>
> conf.setOutputFormat(TextOutputFormat.class);
>
> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>
> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>
> JobClient.runJob(conf);
>
> }
>
> }
>
> Thanks,
>
> Max Lebedev
>
>
> On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee <
> [email protected]> wrote:
>
>> I agree with Shahab , you have to ensure that the key are writable
>> comparable and values are writable in order to be used in MR.
>>
>> You can have writable comparable implementation wrapping the actual
>> json object.
>>
>> Thanks,
>> Rahul
>>
>>
>> On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <[email protected]> wrote:
>>
>>> Hello,
>>>
>>> On 4 Jun 2013, at 23:49, Max Lebedev <[email protected]> wrote:
>>>
>>> Hi. I've been trying to use JSONObjects to identify duplicates in
>>> JSONStrings.
>>> The duplicate strings contain the same data, but not necessarily in the
>>> same order. For example the following two lines should be identified as
>>> duplicates (and filtered).
>>>
>>>
>>> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
>>>
>>> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>>>
>>> Can you not use the timestamp as a URI and emit them as URIs. Then you
>>> have your mapper emit the following kv :
>>>
>>> output.collect(ts, value);
>>>
>>> And you would have a straight forward reducer that can dedup based on
>>> the timestamps.
>>>
>>> If above doesn't work for you, I would look at the jackson library for
>>> mangling json in java. It method of using java beans for json is clean from
>>> a code pov and comes with lots of nice features.
>>> http://stackoverflow.com/a/2255893
>>>
>>> P.S. In your code you are using the old'er map reduce API, I would
>>> look at using the newer APIs in this package org.apache.hadoop.mapreduce
>>>
>>> Mischa
>>>
>>> This is the code:
>>>
>>> class DupFilter{
>>>
>>> public static class Map extends MapReduceBase implements
>>> Mapper<LongWritable, Text, JSONObject, Text> {
>>>
>>> public void map(LongWritable key, Text value,
>>> OutputCollector<JSONObject, Text> output, Reporter reporter) throws
>>> IOException{
>>>
>>> JSONObject jo = null;
>>>
>>> try {
>>>
>>> jo = new JSONObject(value.toString());
>>>
>>> } catch (JSONException e) {
>>>
>>> e.printStackTrace();
>>>
>>> }
>>>
>>> output.collect(jo, value);
>>>
>>> }
>>>
>>> }
>>>
>>> public static class Reduce extends MapReduceBase implements
>>> Reducer<JSONObject, Text, NullWritable, Text> {
>>>
>>> public void reduce(JSONObject jo, Iterator<Text> lines,
>>> OutputCollector<NullWritable, Text> output, Reporter reporter) throws
>>> IOException {
>>>
>>> output.collect(null, lines.next());
>>>
>>> }
>>>
>>> }
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> JobConf conf = new JobConf(DupFilter.class);
>>>
>>> conf.setOutputKeyClass(JSONObject.class);
>>>
>>> conf.setOutputValueClass(Text.class);
>>>
>>> conf.setMapperClass(Map.class);
>>>
>>> conf.setReducerClass(Reduce.class);
>>>
>>> conf.setInputFormat(TextInputFormat.class);
>>>
>>> conf.setOutputFormat(TextOutputFormat.class);
>>>
>>> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>>>
>>> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>>
>>> JobClient.runJob(conf);
>>>
>>> }
>>>
>>> }
>>>
>>> I get the following error:
>>>
>>>
>>> java.lang.ClassCastException: class org.json.JSONObject
>>>
>>> at java.lang.Class.asSubclass(Class.java:3027)
>>>
>>> at
>>> org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
>>>
>>> at
>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
>>>
>>> at
>>> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
>>>
>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>>
>>> at
>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>>
>>>
>>>
>>> It looks like it has something to do with conf.setOutputKeyClass(). Am
>>> I doing something wrong here?
>>>
>>>
>>> Thanks,
>>>
>>> Max Lebedev
>>>
>>>
>>> _______________________________
>>> Mischa Tuffield PhD
>>> http://mmt.me.uk/
>>> @mischat
>>>
>>>
>>>
>>>
>>>
>>>
>>
>
>