I’ve taken your advice and made a wrapper class which implements
WritableComparable. Thank you very much for your help. I believe everything
is working fine on that front. I used google’s gson for the comparison.
public int compareTo(Object o) {
JsonElement o1 = PARSER.parse(this.json.toString());
JsonElement o2 = PARSER.parse(o.toString());
if(o2.equals(o1))
return 0;
else
return -1;
}
The problem I have now is that only consecutive duplicates are detected.
Given 6 lines:
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}
{"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}
{"ts":1368758947.291035,
"source":"sdk","isSecure":false,"version":2,"debug":false}
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is exactly
equal to 1. If I switch 5 and 6, the original line 5 is no longer filtered
(I get 1,3,4,5,6). I’ve noticed that the compareTo method is called a total
of 13 times. I assume that in order for all 6 of the keys to be compared,
15 comparisons need to be made. Am I missing something here? I’ve tested
the compareTo manually and line 1 and 6 are interpreted as equal. My map
reduce code currently looks like this:
class DupFilter{
private static final Gson GSON = new Gson();
private static final JsonParser PARSER = new JsonParser();
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
public void map(LongWritable key, Text value,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
IOException{
JsonElement je = PARSER.parse(value.toString());
JSONWrapper jow = null;
jow = new JSONWrapper(value.toString());
IntWritable one = new IntWritable(1);
output.collect(jow, one);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {
public void reduce(JSONWrapper jow, Iterator<IntWritable> values,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
IOException {
int sum = 0;
while (values.hasNext())
sum += values.next().get();
output.collect(jow, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DupFilter.class);
conf.setJobName("dupfilter");
conf.setOutputKeyClass(JSONWrapper.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Thanks,
Max Lebedev
On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee <
[email protected]> wrote:
> I agree with Shahab , you have to ensure that the key are writable
> comparable and values are writable in order to be used in MR.
>
> You can have writable comparable implementation wrapping the actual json
> object.
>
> Thanks,
> Rahul
>
>
> On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <[email protected]> wrote:
>
>> Hello,
>>
>> On 4 Jun 2013, at 23:49, Max Lebedev <[email protected]> wrote:
>>
>> Hi. I've been trying to use JSONObjects to identify duplicates in
>> JSONStrings.
>> The duplicate strings contain the same data, but not necessarily in the
>> same order. For example the following two lines should be identified as
>> duplicates (and filtered).
>>
>>
>> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
>>
>> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>>
>> Can you not use the timestamp as a URI and emit them as URIs. Then you
>> have your mapper emit the following kv :
>>
>> output.collect(ts, value);
>>
>> And you would have a straight forward reducer that can dedup based on the
>> timestamps.
>>
>> If above doesn't work for you, I would look at the jackson library for
>> mangling json in java. It method of using java beans for json is clean from
>> a code pov and comes with lots of nice features.
>> http://stackoverflow.com/a/2255893
>>
>> P.S. In your code you are using the old'er map reduce API, I would look
>> at using the newer APIs in this package org.apache.hadoop.mapreduce
>>
>> Mischa
>>
>> This is the code:
>>
>> class DupFilter{
>>
>> public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, JSONObject, Text> {
>>
>> public void map(LongWritable key, Text value,
>> OutputCollector<JSONObject, Text> output, Reporter reporter) throws
>> IOException{
>>
>> JSONObject jo = null;
>>
>> try {
>>
>> jo = new JSONObject(value.toString());
>>
>> } catch (JSONException e) {
>>
>> e.printStackTrace();
>>
>> }
>>
>> output.collect(jo, value);
>>
>> }
>>
>> }
>>
>> public static class Reduce extends MapReduceBase implements
>> Reducer<JSONObject, Text, NullWritable, Text> {
>>
>> public void reduce(JSONObject jo, Iterator<Text> lines,
>> OutputCollector<NullWritable, Text> output, Reporter reporter) throws
>> IOException {
>>
>> output.collect(null, lines.next());
>>
>> }
>>
>> }
>>
>> public static void main(String[] args) throws Exception {
>>
>> JobConf conf = new JobConf(DupFilter.class);
>>
>> conf.setOutputKeyClass(JSONObject.class);
>>
>> conf.setOutputValueClass(Text.class);
>>
>> conf.setMapperClass(Map.class);
>>
>> conf.setReducerClass(Reduce.class);
>>
>> conf.setInputFormat(TextInputFormat.class);
>>
>> conf.setOutputFormat(TextOutputFormat.class);
>>
>> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>>
>> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>
>> JobClient.runJob(conf);
>>
>> }
>>
>> }
>>
>> I get the following error:
>>
>>
>> java.lang.ClassCastException: class org.json.JSONObject
>>
>> at java.lang.Class.asSubclass(Class.java:3027)
>>
>> at
>> org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
>>
>> at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
>>
>> at
>> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
>>
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>
>>
>>
>> It looks like it has something to do with conf.setOutputKeyClass(). Am I
>> doing something wrong here?
>>
>>
>> Thanks,
>>
>> Max Lebedev
>>
>>
>> _______________________________
>> Mischa Tuffield PhD
>> http://mmt.me.uk/
>> @mischat
>>
>>
>>
>>
>>
>>
>